Skip to content

Commit

Permalink
improved net client unit tester
Browse files Browse the repository at this point in the history
  • Loading branch information
Erik Sohns committed Dec 27, 2013
1 parent cdde726 commit 13d541e
Show file tree
Hide file tree
Showing 42 changed files with 1,210 additions and 1,194 deletions.
2 changes: 1 addition & 1 deletion README.md
2 changes: 1 addition & 1 deletion Yarp/README
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Welcome to the **Yarp** wiki!
Welcome to the **Yarp** project!

**Some notes from the (only) contributor**

Expand Down
41 changes: 30 additions & 11 deletions net/client/rpg_net_client_asynchconnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ RPG_Net_Client_AsynchConnector::RPG_Net_Client_AsynchConnector()
RPG_TRACE(ACE_TEXT("RPG_Net_Client_AsynchConnector::RPG_Net_Client_AsynchConnector"));

// init base class
if (inherited::open(false, // pass addresses
ACE_Proactor::instance(), // default proactor
false) == -1) // validate new connections
if (inherited::open(false, // pass addresses ?
NULL, // default proactor
true) == -1) // validate new connections ?
ACE_DEBUG((LM_ERROR,
ACE_TEXT("failed to ACE_Asynch_Connector::open(): \"%m\", continuing\n")));
}
Expand All @@ -60,22 +60,29 @@ RPG_Net_Client_AsynchConnector::make_handler(void)
return handler_out;
}

void
RPG_Net_Client_AsynchConnector::abort()
{
RPG_TRACE(ACE_TEXT("RPG_Net_Client_AsynchConnector::abort"));

if (inherited::cancel() == -1)
ACE_DEBUG((LM_ERROR,
ACE_TEXT("failed to ACE_Asynch_Connector::cancel(): \"%m\", continuing\n")));
}

void
RPG_Net_Client_AsynchConnector::connect(const ACE_INET_Addr& peer_address)
{
RPG_TRACE(ACE_TEXT("RPG_Net_Client_AsynchConnector::connect"));

int result = -1;
result = inherited::connect(peer_address, // remote SAP
ACE_sap_any_cast(const ACE_INET_Addr&), // local SAP
1, // re-use address (SO_REUSEADDR) ?
NULL); // ACT
int result = inherited::connect(peer_address, // remote SAP
ACE_sap_any_cast(const ACE_INET_Addr&), // local SAP
1, // re-use address (SO_REUSEADDR) ?
NULL); // ACT
if (result == -1)
{
ACE_TCHAR buffer[RPG_COMMON_BUFSIZE];
ACE_OS::memset(buffer,
0,
(RPG_COMMON_BUFSIZE * sizeof(ACE_TCHAR)));
ACE_OS::memset(buffer, 0, sizeof(buffer));
if (peer_address.addr_to_string(buffer,
sizeof(buffer)) == -1)
ACE_DEBUG((LM_ERROR,
Expand All @@ -85,3 +92,15 @@ RPG_Net_Client_AsynchConnector::connect(const ACE_INET_Addr& peer_address)
buffer));
} // end IF
}

int
RPG_Net_Client_AsynchConnector::validate_connection(const ACE_Asynch_Connect::Result& result_in,
const ACE_INET_Addr& remoteSAP_in,
const ACE_INET_Addr& localSAP_in)
{
RPG_TRACE(ACE_TEXT("RPG_Net_Client_AsynchConnector::validate_connection"));

return inherited::validate_connection(result_in,
remoteSAP_in,
localSAP_in);
}
5 changes: 5 additions & 0 deletions net/client/rpg_net_client_asynchconnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,13 @@ class RPG_Net_Client_Export RPG_Net_Client_AsynchConnector
virtual RPG_Net_AsynchStreamHandler_t* make_handler(void);

// implement RPG_Net_Client_IConnector
virtual void abort();
virtual void connect(const ACE_INET_Addr&);

virtual int validate_connection(const ACE_Asynch_Connect::Result&, // result
const ACE_INET_Addr&, // peer SAP
const ACE_INET_Addr&); // local SAP

private:
typedef ACE_Asynch_Connector<RPG_Net_AsynchStreamHandler_t> inherited;

Expand Down
10 changes: 10 additions & 0 deletions net/client/rpg_net_client_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ RPG_Net_Client_Connector::make_svc_handler(RPG_Net_StreamHandler_t*& handler_ino
return ((handler_inout == NULL) ? -1 : 0);
}

void
RPG_Net_Client_Connector::abort()
{
RPG_TRACE(ACE_TEXT("RPG_Net_Client_Connector::abort"));

if (inherited::close() == -1)
ACE_DEBUG((LM_ERROR,
ACE_TEXT("failed to ACE_Connector::close(): \"%m\", continuing\n")));
}

void
RPG_Net_Client_Connector::connect(const ACE_INET_Addr& peer_address)
{
Expand Down
1 change: 1 addition & 0 deletions net/client/rpg_net_client_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class RPG_Net_Client_Export RPG_Net_Client_Connector
virtual int make_svc_handler(RPG_Net_StreamHandler_t*&);

// implement RPG_Net_Client_IConnector
virtual void abort();
virtual void connect(const ACE_INET_Addr&);

private:
Expand Down
1 change: 1 addition & 0 deletions net/client/rpg_net_client_iconnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class RPG_Net_Client_IConnector
public:
virtual ~RPG_Net_Client_IConnector() {}

virtual void abort() = 0; // shutdown
virtual void connect(const ACE_INET_Addr&) = 0;
};

Expand Down
124 changes: 78 additions & 46 deletions net/rpg_net_common_tools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,24 @@ tp_event_dispatcher_func(void* args_in)
ACE_DEBUG((LM_DEBUG,
ACE_TEXT("(%t) worker starting...\n")));

// ignore SIGPIPE: need this to continue gracefully after a client
// suddenly disconnects (i.e. application/system crash, etc...)
// --> specify ignore action
// *IMPORTANT NOTE*: don't actually need to keep this around after registration
// *WARNING*: do NOT restart system calls in this case (see manual)
ACE_Sig_Action no_sigpipe(static_cast<ACE_SignalHandler>(SIG_IGN),
ACE_Sig_Set(1), // mask of signals to be blocked while servicing
// --> block them all ! (except KILL off course...)
SA_SIGINFO); // flags
// (SA_RESTART | SA_SIGINFO)); // flags
ACE_Sig_Action original_action;
if (no_sigpipe.register_action(SIGPIPE, &original_action) == -1)
ACE_DEBUG((LM_DEBUG,
ACE_TEXT("failed to ACE_Sig_Action::register_action(SIGPIPE): \"%m\", continuing\n")));

// *IMPORTANT NOTE*: "The signal disposition is a per-process attribute: in a multithreaded
// application, the disposition of a particular signal is the same for
// all threads." (see man 7 signal)
// // ignore SIGPIPE: need this to continue gracefully after a client
// // suddenly disconnects (i.e. application/system crash, etc...)
// // --> specify ignore action
// // *IMPORTANT NOTE*: don't actually need to keep this around after registration
// // *NOTE*: do NOT restart system calls in this case (see manual)
// ACE_Sig_Action no_sigpipe(static_cast<ACE_SignalHandler>(SIG_IGN), // ignore action
// ACE_Sig_Set(1), // mask of signals to be blocked when servicing
// // --> block them all (bar KILL/STOP; see manual)
// SA_SIGINFO); // flags
//// (SA_RESTART | SA_SIGINFO)); // flags
// ACE_Sig_Action original_action;
// if (no_sigpipe.register_action(SIGPIPE, &original_action) == -1)
// ACE_DEBUG((LM_DEBUG,
// ACE_TEXT("failed to ACE_Sig_Action::register_action(SIGPIPE): \"%m\", continuing\n")));
//
int success = -1;
// handle any events...
if (use_reactor)
Expand All @@ -80,16 +83,14 @@ tp_event_dispatcher_func(void* args_in)
ACE_DEBUG((LM_ERROR,
ACE_TEXT("(%t) failed to handle events: \"%m\", aborting\n")));

// clean up
if (no_sigpipe.restore_action(SIGPIPE, original_action) == -1)
ACE_DEBUG((LM_DEBUG,
ACE_TEXT("failed to ACE_Sig_Action::restore_action(SIGPIPE): \"%m\", continuing\n")));
//// clean up
//if (no_sigpipe.restore_action(SIGPIPE, original_action) == -1)
// ACE_DEBUG((LM_DEBUG,
// ACE_TEXT("failed to ACE_Sig_Action::restore_action(SIGPIPE): \"%m\", continuing\n")));

ACE_DEBUG((LM_DEBUG,
ACE_TEXT("(%t) worker leaving...\n")));

// *PORTABILITY*
// *TODO*
return (success == 0 ? NULL : NULL);
}

Expand Down Expand Up @@ -1220,12 +1221,10 @@ RPG_Net_Common_Tools::setSocketBuffer(const ACE_HANDLE& handle_in,
size /= 2;

if (size_in % 2)
{ // debug info
ACE_DEBUG((LM_WARNING,
ACE_TEXT("requested %s buffer size %u is ODD...\n"),
((option_in == SO_SNDBUF) ? ACE_TEXT("SO_SNDBUF") : ACE_TEXT("SO_RCVBUF")),
size_in));
} // end IF
} // end IF

if (ACE_OS::setsockopt(handle_in,
Expand All @@ -1234,10 +1233,12 @@ RPG_Net_Common_Tools::setSocketBuffer(const ACE_HANDLE& handle_in,
reinterpret_cast<const char*>(&size),
sizeof(int)))
{
ACE_DEBUG((LM_ERROR,
ACE_TEXT("failed to ACE_OS::setsockopt(%u, %s): \"%m\", aborting\n"),
handle_in,
((option_in == SO_SNDBUF) ? ACE_TEXT("SO_SNDBUF") : ACE_TEXT("SO_RCVBUF"))));
int error = ACE_OS::last_error();
if (error != ENOTSOCK) // <-- socket has been closed asynchronously
ACE_DEBUG((LM_ERROR,
ACE_TEXT("failed to ACE_OS::setsockopt(%u, %s): \"%m\", aborting\n"),
reinterpret_cast<unsigned int>(const_cast<ACE_HANDLE&>(handle_in)),
((option_in == SO_SNDBUF) ? ACE_TEXT("SO_SNDBUF") : ACE_TEXT("SO_RCVBUF"))));

return false;
} // end IF
Expand All @@ -1251,20 +1252,22 @@ RPG_Net_Common_Tools::setSocketBuffer(const ACE_HANDLE& handle_in,
reinterpret_cast<char*>(&size),
&retsize))
{
ACE_DEBUG((LM_ERROR,
ACE_TEXT("failed to ACE_OS::getsockopt(%u, %s): \"%m\", aborting\n"),
handle_in,
((option_in == SO_SNDBUF) ? ACE_TEXT("SO_SNDBUF") : ACE_TEXT("SO_RCVBUF"))));
int error = ACE_OS::last_error();
if (error != ENOTSOCK) // <-- socket has been closed asynchronously
ACE_DEBUG((LM_ERROR,
ACE_TEXT("failed to ACE_OS::getsockopt(%u, %s): \"%m\", aborting\n"),
reinterpret_cast<unsigned int>(const_cast<ACE_HANDLE&>(handle_in)),
((option_in == SO_SNDBUF) ? ACE_TEXT("SO_SNDBUF") : ACE_TEXT("SO_RCVBUF"))));

return false;
} // end IF

if (size != size_in)
{
ACE_DEBUG((LM_WARNING,
ACE_TEXT("ACE_OS::getsockopt(%s) on handle (ID: %d) returned %d (expected: %d), aborting\n"),
ACE_TEXT("ACE_OS::getsockopt(%s) on handle %u returned %d (expected: %d), aborting\n"),
((option_in == SO_SNDBUF) ? ACE_TEXT("SO_SNDBUF") : ACE_TEXT("SO_RCVBUF")),
handle_in,
reinterpret_cast<unsigned int>(const_cast<ACE_HANDLE&>(handle_in)),
size,
size_in));

Expand All @@ -1276,10 +1279,10 @@ RPG_Net_Common_Tools::setSocketBuffer(const ACE_HANDLE& handle_in,
} // end IF

//ACE_DEBUG((LM_DEBUG,
// ACE_TEXT("set \"%s\" option of socket (ID: %d) to: %d\n"),
// ACE_TEXT("set \"%s\" option of socket %u to: %d\n"),
// ((option_in == SO_RCVBUF) ? ACE_TEXT("SO_RCVBUF")
// : ACE_TEXT("SO_SNDBUF")),
// handle_in,
// static_cast<unsigned int>(handle_in),
// size));

return true;
Expand All @@ -1298,9 +1301,11 @@ RPG_Net_Common_Tools::setNoDelay(const ACE_HANDLE& handle_in,
reinterpret_cast<const char*>(&value),
sizeof(int)))
{
ACE_DEBUG((LM_ERROR,
ACE_TEXT("failed to ACE_OS::setsockopt(%u, TCP_NODELAY): \"%m\", aborting\n"),
handle_in));
int error = ACE_OS::last_error();
if (error != ENOTSOCK) // <-- socket has been closed asynchronously
ACE_DEBUG((LM_ERROR,
ACE_TEXT("failed to ACE_OS::setsockopt(%u, TCP_NODELAY): \"%m\", aborting\n"),
reinterpret_cast<unsigned int>(const_cast<ACE_HANDLE&>(handle_in))));

return false;
} // end IF
Expand All @@ -1315,16 +1320,18 @@ RPG_Net_Common_Tools::setNoDelay(const ACE_HANDLE& handle_in,
&retsize) ||
(retsize != sizeof(int)))
{
ACE_DEBUG((LM_ERROR,
ACE_TEXT("failed to ACE_OS::getsockopt(%u, TCP_NODELAY): \"%m\", aborting\n"),
handle_in));
int error = ACE_OS::last_error();
if (error != ENOTSOCK) // <-- socket has been closed asynchronously
ACE_DEBUG((LM_ERROR,
ACE_TEXT("failed to ACE_OS::getsockopt(%u, TCP_NODELAY): \"%m\", aborting\n"),
reinterpret_cast<unsigned int>(const_cast<ACE_HANDLE&>(handle_in))));

return false;
} // end IF

//ACE_DEBUG((LM_DEBUG,
// ACE_TEXT("setsockopt(%u, TCP_NODELAY): %s\n"),
// handle_in,
// static_cast<unsigned int>(handle_in),
// (noDelay_in ? ((value == 1) ? "on" : "off")
// : ((value == 0) ? "off" : "on"))));

Expand Down Expand Up @@ -1652,11 +1659,36 @@ RPG_Net_Common_Tools::retrieveSignalInfo(const int& signal_in,
}
} // end SWITCH
#else
// under Windows(TM), we only have the handle(s)...
ACE_UNUSED_ARG(signal_in);
switch (signal_in)
{
case SIGINT:
information << ACE_TEXT("SIGINT"); break;
case SIGILL:
information << ACE_TEXT("SIGILL"); break;
case SIGFPE:
information << ACE_TEXT("SIGFPE"); break;
case SIGSEGV:
information << ACE_TEXT("SIGSEGV"); break;
case SIGTERM:
information << ACE_TEXT("SIGTERM"); break;
case SIGBREAK:
information << ACE_TEXT("SIGBREAK"); break;
case SIGABRT:
information << ACE_TEXT("SIGABRT"); break;
case SIGABRT_COMPAT:
information << ACE_TEXT("SIGABRT_COMPAT"); break;
default:
{
ACE_DEBUG((LM_DEBUG,
ACE_TEXT("invalid/unknown signal: %S, continuing\n"),
signal_in));

break;
}
} // end SWITCH

//information << ACE_TEXT(", signalled handle: ");
//information << info_in.si_handle_;
information << ACE_TEXT(", signalled handle: ");
information << info_in.si_handle_;
//information << ACE_TEXT(", array of signalled handle(s): ");
//information << info_in.si_handles_;
#endif
Expand Down
25 changes: 13 additions & 12 deletions net/rpg_net_connection_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
#include "rpg_net_iconnectionmanager.h"
#include "rpg_net_sockethandler_base.h"

#include <rpg_common_istatistic.h>
#include <rpg_common_idumpstate.h>
#include "rpg_common_istatistic.h"
#include "rpg_common_idumpstate.h"

#include <ace/Singleton.h>
#include <ace/Synch.h>
Expand Down Expand Up @@ -56,6 +56,11 @@ class RPG_Net_Connection_Manager
// *NOTE*: argument is passed in init() to EVERY new connection during registration
void set(const ConfigType&); // (user) data

// implement RPG_Common_IControl
virtual void start();
virtual void stop();
virtual bool isRunning() const;

// *NOTE*: users of this method should be aware of potential race
// conditions with this method and (de-)registerConnection.
// Scenario: well-behaved shutdown going on WHILE client closes a connection.
Expand All @@ -67,6 +72,7 @@ class RPG_Net_Connection_Manager

// *WARNING*: to be used for testing ONLY !
void abortOldestConnection();
void abortNewestConnection();

// implement RPG_Common_IStatistic
virtual void report() const;
Expand All @@ -76,11 +82,9 @@ class RPG_Net_Connection_Manager

private:
typedef RPG_Net_IConnection<ConfigType, StatisticsContainerType> CONNECTION_TYPE;
// typedef std::list<CONNECTION_TYPE*> CONNECTIONLIST_TYPE;
// *NOTE*: cannot write this - it confuses gcc...
// typedef CONNECTIONLIST_TYPE::const_iterator CONNECTIONLIST_CONSTITERATOR_TYPE;
// typedef CONNECTIONLIST_TYPE::iterator CONNECTIONLIST_ITERATOR_TYPE;
typedef ACE_DLList<CONNECTION_TYPE> CONNECTIONLIST_TYPE;
typedef ACE_DLList_Iterator<CONNECTION_TYPE> CONNECTIONLIST_ITERATOR_TYPE;
typedef ACE_DLList_Reverse_Iterator<CONNECTION_TYPE> CONNECTIONLIST_REVERSEITERATOR_TYPE;

// *NOTE*: these are used by RPG_Net_SocketHandler_Base
virtual bool registerConnection(CONNECTION_TYPE*); // connection
Expand All @@ -104,13 +108,10 @@ class RPG_Net_Connection_Manager
mutable ACE_Condition<ACE_Recursive_Thread_Mutex> myCondition;

unsigned int myMaxNumConnections;
// CONNECTIONLIST_TYPE myConnections;
ACE_DLList<CONNECTION_TYPE> myConnections;

// handler data
ConfigType myUserData;

CONNECTIONLIST_TYPE myConnections;
ConfigType myUserData; // handler data
bool myIsInitialized;
bool myIsActive;
};

// include template implementation
Expand Down
Loading

0 comments on commit 13d541e

Please sign in to comment.