From 57da2a99822519d52cc9845be27d421dba0c3911 Mon Sep 17 00:00:00 2001 From: James O'Hea <128136050+JamesOHeaDLS@users.noreply.github.com> Date: Thu, 29 Aug 2024 13:24:30 +0100 Subject: [PATCH] Zmq threads and context threads (#25) * Repurpose now unused sockets as contextthreads * Pass broker cmd line threads or the default * Give HandleRxSocket an extra argument so can use context_threads * Add hypen to variable for easier reading * Clearer option description * Remove duplicate EigerFan.h reference * Rename num_zmq_threads to num_threads --- cpp/data/eigerfan/include/EigerFan.h | 2 +- cpp/data/eigerfan/include/EigerFanConfig.h | 26 ++++++++--------- cpp/data/eigerfan/src/EigerFan.cpp | 34 ++++++++++------------ cpp/data/eigerfan/src/eigerfan_main.cpp | 16 +++++----- 4 files changed, 38 insertions(+), 40 deletions(-) diff --git a/cpp/data/eigerfan/include/EigerFan.h b/cpp/data/eigerfan/include/EigerFan.h index 3fda29b..2fbf66e 100644 --- a/cpp/data/eigerfan/include/EigerFan.h +++ b/cpp/data/eigerfan/include/EigerFan.h @@ -36,7 +36,7 @@ class EigerFan { EigerFan(EigerFanConfig config_); virtual ~EigerFan(); void run(); - void HandleRxSocket(std::string& endpoint); + void HandleRxSocket(std::string& endpoint, int num_zmq_context_threads); void Stop(); void SetNumberOfConsumers(int number); diff --git a/cpp/data/eigerfan/include/EigerFanConfig.h b/cpp/data/eigerfan/include/EigerFanConfig.h index f0c2e46..eade452 100644 --- a/cpp/data/eigerfan/include/EigerFanConfig.h +++ b/cpp/data/eigerfan/include/EigerFanConfig.h @@ -17,7 +17,7 @@ namespace EigerFanDefaults { const std::string DEFAULT_STREAM_ADDRESS = "localhost"; const std::string DEFAULT_EIGER_PORT_NUMBER = Eiger::STREAM_PORT_NUMBER; const int DEFAULT_FAN_PORT_NUMBER_START = 31600; - const int DEFAULT_NUM_SOCKETS = 1; + const int DEFAULT_NUM_CONTEXT_THREADS = 1; const int DEFAULT_BLOCK_SIZE = 1; const std::string DEFAULT_FORWARD_PORT_NUMBER = "9009"; } @@ -27,14 +27,14 @@ class EigerFanConfig public: EigerFanConfig() : - num_zmq_threads(EigerFanDefaults::DEFAULT_NUM_THREADS), + num_threads(EigerFanDefaults::DEFAULT_NUM_THREADS), num_consumers(EigerFanDefaults::DEFAULT_NUM_CONSUMERS), ctrl_channel_port(EigerFanDefaults::DEFAULT_CONTROL_PORT_NUMBER), eiger_channel_address(EigerFanDefaults::DEFAULT_STREAM_ADDRESS), eiger_channel_port(EigerFanDefaults::DEFAULT_EIGER_PORT_NUMBER), forward_channel_port(EigerFanDefaults::DEFAULT_FORWARD_PORT_NUMBER), fan_channel_port_start(EigerFanDefaults::DEFAULT_FAN_PORT_NUMBER_START), - num_zmq_sockets(EigerFanDefaults::DEFAULT_NUM_SOCKETS), + num_zmq_context_threads(EigerFanDefaults::DEFAULT_NUM_CONTEXT_THREADS), block_size(EigerFanDefaults::DEFAULT_BLOCK_SIZE) { }; @@ -51,16 +51,16 @@ class EigerFanConfig num_consumers = numConsumers; } - void setNum0MQThreads(int numZmqThreads) { - num_zmq_threads = numZmqThreads; + void setNumThreads(int numThreads) { + num_threads = numThreads; } void setFanChannelPortStart(int fanChannelPortStart) { fan_channel_port_start = fanChannelPortStart; } - void setNum0MQSockets(int numZmqSockets) { - num_zmq_sockets = numZmqSockets; + void setNum0MQContextThreads(int numZmqContextThreads) { + num_zmq_context_threads = numZmqContextThreads; } void setBlockSize(int blockSize) { @@ -83,16 +83,16 @@ class EigerFanConfig return num_consumers; } - int getNum0MQThreads() const { - return num_zmq_threads; + int getNumThreads() const { + return num_threads; } int getFanChannelPortStart() const { return fan_channel_port_start; } - int getNum0MQSockets() const { - return num_zmq_sockets; + int getNum0MQContextThreads() const { + return num_zmq_context_threads; } int getBlockSize() const { @@ -105,14 +105,14 @@ class EigerFanConfig private: - int num_zmq_threads; // Number of 0MQ threads + int num_threads; // Number of 0MQ threads int num_consumers; // Expected number of consumers std::string ctrl_channel_port; // Port to bind to for the control channel std::string eiger_channel_address; // Address to connect to for the Eiger Stream std::string eiger_channel_port; // Port to connect to for the Eiger Stream std::string forward_channel_port; // Port to bind to for the forwarding channel int fan_channel_port_start; // Port to bind to for the fan channel - int num_zmq_sockets; // Number of 0MQ sockets + int num_zmq_context_threads; // Number of 0MQ context threads int block_size; // Block Size being used by the downstream data file writers friend class EigerFan; diff --git a/cpp/data/eigerfan/src/EigerFan.cpp b/cpp/data/eigerfan/src/EigerFan.cpp index 93aef2e..93b4702 100644 --- a/cpp/data/eigerfan/src/EigerFan.cpp +++ b/cpp/data/eigerfan/src/EigerFan.cpp @@ -16,8 +16,6 @@ #include "EigerFan.h" -#include "EigerFan.h" - // Utility variables int more; size_t more_size = sizeof (more); @@ -73,10 +71,10 @@ std::string PadInt(int value) { * Default constructor for the EigerFan class */ EigerFan::EigerFan() -: ctx_(EigerFanDefaults::DEFAULT_NUM_THREADS), +: ctx_(EigerFanDefaults::DEFAULT_NUM_CONTEXT_THREADS), controlSocket(ctx_, ZMQ_ROUTER), forwardSocket(ctx_, ZMQ_PUSH), - broker(BROKER_INPROC_ENDPOINT, 1) + broker(BROKER_INPROC_ENDPOINT, EigerFanDefaults::DEFAULT_NUM_THREADS) { this->log = log4cxx::Logger::getLogger("ED.EigerFan"); LOG4CXX_INFO(log, "Creating EigerFan object from default options"); @@ -99,10 +97,10 @@ EigerFan::EigerFan() * \param[in] config_ Config options */ EigerFan::EigerFan(EigerFanConfig config_) -: ctx_(config_.num_zmq_threads), +: ctx_(config_.num_zmq_context_threads), controlSocket(ctx_, ZMQ_ROUTER), forwardSocket(ctx_, ZMQ_PUSH), - broker(BROKER_INPROC_ENDPOINT, 8) + broker(BROKER_INPROC_ENDPOINT, config_.num_threads) { this->log = log4cxx::Logger::getLogger("ED.EigerFan"); config = config_; @@ -308,7 +306,7 @@ void EigerFan::run() { // Spawn rx thread LOG4CXX_INFO(log, "Spawning rx thread"); this->rx_thread_ = boost::shared_ptr( - new boost::thread(boost::bind(&EigerFan::HandleRxSocket, this, streamConnectionAddress)) + new boost::thread(boost::bind(&EigerFan::HandleRxSocket, this, streamConnectionAddress, config.num_zmq_context_threads)) ); while (state != WAITING_STREAM) { @@ -360,8 +358,8 @@ void EigerFan::run() { /** * Connect broker to detector and handle the messages it produces */ -void EigerFan::HandleRxSocket(std::string& endpoint) { - zmq::context_t inproc_context(8); +void EigerFan::HandleRxSocket(std::string& endpoint, int num_zmq_context_threads) { + zmq::context_t inproc_context(num_zmq_context_threads); zmq::socket_t rx_socket(inproc_context, ZMQ_PULL); rx_socket.setsockopt(ZMQ_RCVHWM, &RECEIVE_HWM, sizeof(RECEIVE_HWM)); rx_socket.bind(BROKER_INPROC_ENDPOINT.c_str()); @@ -1000,17 +998,17 @@ void EigerFan::HandleControlMessage(zmq::message_t &message, zmq::message_t &idM rapidjson::Document document; document.SetObject(); - // Add Number of 0MQ threads - rapidjson::Value keyNumZMQThreads("num_zmq_threads", document.GetAllocator()); - rapidjson::Value valueNumZMQThreads(config.num_zmq_threads); - document.AddMember(keyNumZMQThreads, valueNumZMQThreads, document.GetAllocator()); + // Add Number of threads + rapidjson::Value keyNumThreads("num_threads", document.GetAllocator()); + rapidjson::Value valueNumThreads(config.num_threads); + document.AddMember(keyNumThreads, valueNumThreads, document.GetAllocator()); - // Add Number of 0MQ sockets - rapidjson::Value keyNumZMQSockets("num_zmq_sockets", document.GetAllocator()); - rapidjson::Value valueNumZMQSockets(config.num_zmq_sockets); - document.AddMember(keyNumZMQSockets, valueNumZMQSockets, document.GetAllocator()); + // Add Number of 0MQ context threads + rapidjson::Value keyNumZMQContextThreads("num_zmq_context_threads", document.GetAllocator()); + rapidjson::Value valueNumZMQContextThreads(config.num_zmq_context_threads); + document.AddMember(keyNumZMQContextThreads, valueNumZMQContextThreads, document.GetAllocator()); - // Add Number of 0MQ threads + // Add Number of 0MQ consumers rapidjson::Value keyNumConsumers("num_consumers", document.GetAllocator()); rapidjson::Value valueNumConsumers(config.num_consumers); document.AddMember(keyNumConsumers, valueNumConsumers, document.GetAllocator()); diff --git a/cpp/data/eigerfan/src/eigerfan_main.cpp b/cpp/data/eigerfan/src/eigerfan_main.cpp index 9e82bf8..916b1af 100644 --- a/cpp/data/eigerfan/src/eigerfan_main.cpp +++ b/cpp/data/eigerfan/src/eigerfan_main.cpp @@ -149,7 +149,7 @@ int parse_arguments(int argc, char** argv, EigerFanConfig &cfg) ("logconfig,l", po::value(), "Set the log4cxx logging configuration file") ("threads,t", po::value()->default_value(EigerFanDefaults::DEFAULT_NUM_THREADS), - "Set the number of 0MQ threads for the 0MQ context") + "Set the number of threads to create to pull detector data") ("consumers,n", po::value()->default_value(EigerFanDefaults::DEFAULT_NUM_CONSUMERS), "Set the number of expected consumers") ("eigerport,e", po::value()->default_value(EigerFanDefaults::DEFAULT_EIGER_PORT_NUMBER), @@ -160,8 +160,8 @@ int parse_arguments(int argc, char** argv, EigerFanConfig &cfg) "Set the port to accept control messages on") ("addr,s", po::value()->default_value(EigerFanDefaults::DEFAULT_STREAM_ADDRESS), "Set the address of the stream to connect to") - ("sockets,z", po::value()->default_value(EigerFanDefaults::DEFAULT_NUM_SOCKETS), - "Set the number of zmq sockets to connect to the Eiger with") + ("context-threads,z", po::value()->default_value(EigerFanDefaults::DEFAULT_NUM_CONTEXT_THREADS), + "Set the number of zmq context threads to connect to the Eiger with") ("blocksize,b", po::value()->default_value(EigerFanDefaults::DEFAULT_BLOCK_SIZE), "Set the block size being used by the downstream data file writers to") ; @@ -227,8 +227,8 @@ int parse_arguments(int argc, char** argv, EigerFanConfig &cfg) if (vm.count("threads")) { - cfg.setNum0MQThreads(vm["threads"].as()); - LOG4CXX_DEBUG(logger, "Setting number of ZeroMQ threads to " << cfg.getNum0MQThreads()); + cfg.setNumThreads(vm["threads"].as()); + LOG4CXX_DEBUG(logger, "Setting number of threads to " << cfg.getNumThreads()); } if (vm.count("consumers")) @@ -261,10 +261,10 @@ int parse_arguments(int argc, char** argv, EigerFanConfig &cfg) LOG4CXX_DEBUG(logger, "Setting Eiger stream address to " << cfg.getEigerChannelAddress()); } - if (vm.count("sockets")) + if (vm.count("context-threads")) { - cfg.setNum0MQSockets(vm["sockets"].as()); - LOG4CXX_DEBUG(logger, "Setting number of ZeroMQ sockets to " << cfg.getNum0MQSockets()); + cfg.setNum0MQContexThreads(vm["context-threads"].as()); + LOG4CXX_DEBUG(logger, "Setting number of ZeroMQ context threads to " << cfg.getNum0MQContextThreads()); } if (vm.count("blocksize"))