Skip to content

Commit

Permalink
Zmq threads and context threads (#25)
Browse files Browse the repository at this point in the history
* Repurpose now unused sockets as contextthreads
* Pass broker cmd line threads or the default
* Give HandleRxSocket an extra argument so can use context_threads
* Add hypen to variable for easier reading
* Clearer option description
* Remove duplicate EigerFan.h reference
* Rename num_zmq_threads to num_threads
  • Loading branch information
JamesOHeaDLS authored Aug 29, 2024
1 parent 772e0e6 commit 57da2a9
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 40 deletions.
2 changes: 1 addition & 1 deletion cpp/data/eigerfan/include/EigerFan.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class EigerFan {
EigerFan(EigerFanConfig config_);
virtual ~EigerFan();
void run();
void HandleRxSocket(std::string& endpoint);
void HandleRxSocket(std::string& endpoint, int num_zmq_context_threads);
void Stop();
void SetNumberOfConsumers(int number);

Expand Down
26 changes: 13 additions & 13 deletions cpp/data/eigerfan/include/EigerFanConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace EigerFanDefaults {
const std::string DEFAULT_STREAM_ADDRESS = "localhost";
const std::string DEFAULT_EIGER_PORT_NUMBER = Eiger::STREAM_PORT_NUMBER;
const int DEFAULT_FAN_PORT_NUMBER_START = 31600;
const int DEFAULT_NUM_SOCKETS = 1;
const int DEFAULT_NUM_CONTEXT_THREADS = 1;
const int DEFAULT_BLOCK_SIZE = 1;
const std::string DEFAULT_FORWARD_PORT_NUMBER = "9009";
}
Expand All @@ -27,14 +27,14 @@ class EigerFanConfig
public:

EigerFanConfig() :
num_zmq_threads(EigerFanDefaults::DEFAULT_NUM_THREADS),
num_threads(EigerFanDefaults::DEFAULT_NUM_THREADS),
num_consumers(EigerFanDefaults::DEFAULT_NUM_CONSUMERS),
ctrl_channel_port(EigerFanDefaults::DEFAULT_CONTROL_PORT_NUMBER),
eiger_channel_address(EigerFanDefaults::DEFAULT_STREAM_ADDRESS),
eiger_channel_port(EigerFanDefaults::DEFAULT_EIGER_PORT_NUMBER),
forward_channel_port(EigerFanDefaults::DEFAULT_FORWARD_PORT_NUMBER),
fan_channel_port_start(EigerFanDefaults::DEFAULT_FAN_PORT_NUMBER_START),
num_zmq_sockets(EigerFanDefaults::DEFAULT_NUM_SOCKETS),
num_zmq_context_threads(EigerFanDefaults::DEFAULT_NUM_CONTEXT_THREADS),
block_size(EigerFanDefaults::DEFAULT_BLOCK_SIZE)
{
};
Expand All @@ -51,16 +51,16 @@ class EigerFanConfig
num_consumers = numConsumers;
}

void setNum0MQThreads(int numZmqThreads) {
num_zmq_threads = numZmqThreads;
void setNumThreads(int numThreads) {
num_threads = numThreads;
}

void setFanChannelPortStart(int fanChannelPortStart) {
fan_channel_port_start = fanChannelPortStart;
}

void setNum0MQSockets(int numZmqSockets) {
num_zmq_sockets = numZmqSockets;
void setNum0MQContextThreads(int numZmqContextThreads) {
num_zmq_context_threads = numZmqContextThreads;
}

void setBlockSize(int blockSize) {
Expand All @@ -83,16 +83,16 @@ class EigerFanConfig
return num_consumers;
}

int getNum0MQThreads() const {
return num_zmq_threads;
int getNumThreads() const {
return num_threads;
}

int getFanChannelPortStart() const {
return fan_channel_port_start;
}

int getNum0MQSockets() const {
return num_zmq_sockets;
int getNum0MQContextThreads() const {
return num_zmq_context_threads;
}

int getBlockSize() const {
Expand All @@ -105,14 +105,14 @@ class EigerFanConfig

private:

int num_zmq_threads; // Number of 0MQ threads
int num_threads; // Number of 0MQ threads
int num_consumers; // Expected number of consumers
std::string ctrl_channel_port; // Port to bind to for the control channel
std::string eiger_channel_address; // Address to connect to for the Eiger Stream
std::string eiger_channel_port; // Port to connect to for the Eiger Stream
std::string forward_channel_port; // Port to bind to for the forwarding channel
int fan_channel_port_start; // Port to bind to for the fan channel
int num_zmq_sockets; // Number of 0MQ sockets
int num_zmq_context_threads; // Number of 0MQ context threads
int block_size; // Block Size being used by the downstream data file writers

friend class EigerFan;
Expand Down
34 changes: 16 additions & 18 deletions cpp/data/eigerfan/src/EigerFan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

#include "EigerFan.h"

#include "EigerFan.h"

// Utility variables
int more;
size_t more_size = sizeof (more);
Expand Down Expand Up @@ -73,10 +71,10 @@ std::string PadInt(int value) {
* Default constructor for the EigerFan class
*/
EigerFan::EigerFan()
: ctx_(EigerFanDefaults::DEFAULT_NUM_THREADS),
: ctx_(EigerFanDefaults::DEFAULT_NUM_CONTEXT_THREADS),
controlSocket(ctx_, ZMQ_ROUTER),
forwardSocket(ctx_, ZMQ_PUSH),
broker(BROKER_INPROC_ENDPOINT, 1)
broker(BROKER_INPROC_ENDPOINT, EigerFanDefaults::DEFAULT_NUM_THREADS)
{
this->log = log4cxx::Logger::getLogger("ED.EigerFan");
LOG4CXX_INFO(log, "Creating EigerFan object from default options");
Expand All @@ -99,10 +97,10 @@ EigerFan::EigerFan()
* \param[in] config_ Config options
*/
EigerFan::EigerFan(EigerFanConfig config_)
: ctx_(config_.num_zmq_threads),
: ctx_(config_.num_zmq_context_threads),
controlSocket(ctx_, ZMQ_ROUTER),
forwardSocket(ctx_, ZMQ_PUSH),
broker(BROKER_INPROC_ENDPOINT, 8)
broker(BROKER_INPROC_ENDPOINT, config_.num_threads)
{
this->log = log4cxx::Logger::getLogger("ED.EigerFan");
config = config_;
Expand Down Expand Up @@ -308,7 +306,7 @@ void EigerFan::run() {
// Spawn rx thread
LOG4CXX_INFO(log, "Spawning rx thread");
this->rx_thread_ = boost::shared_ptr<boost::thread>(
new boost::thread(boost::bind(&EigerFan::HandleRxSocket, this, streamConnectionAddress))
new boost::thread(boost::bind(&EigerFan::HandleRxSocket, this, streamConnectionAddress, config.num_zmq_context_threads))
);

while (state != WAITING_STREAM) {
Expand Down Expand Up @@ -360,8 +358,8 @@ void EigerFan::run() {
/**
* Connect broker to detector and handle the messages it produces
*/
void EigerFan::HandleRxSocket(std::string& endpoint) {
zmq::context_t inproc_context(8);
void EigerFan::HandleRxSocket(std::string& endpoint, int num_zmq_context_threads) {
zmq::context_t inproc_context(num_zmq_context_threads);
zmq::socket_t rx_socket(inproc_context, ZMQ_PULL);
rx_socket.setsockopt(ZMQ_RCVHWM, &RECEIVE_HWM, sizeof(RECEIVE_HWM));
rx_socket.bind(BROKER_INPROC_ENDPOINT.c_str());
Expand Down Expand Up @@ -1000,17 +998,17 @@ void EigerFan::HandleControlMessage(zmq::message_t &message, zmq::message_t &idM
rapidjson::Document document;
document.SetObject();

// Add Number of 0MQ threads
rapidjson::Value keyNumZMQThreads("num_zmq_threads", document.GetAllocator());
rapidjson::Value valueNumZMQThreads(config.num_zmq_threads);
document.AddMember(keyNumZMQThreads, valueNumZMQThreads, document.GetAllocator());
// Add Number of threads
rapidjson::Value keyNumThreads("num_threads", document.GetAllocator());
rapidjson::Value valueNumThreads(config.num_threads);
document.AddMember(keyNumThreads, valueNumThreads, document.GetAllocator());

// Add Number of 0MQ sockets
rapidjson::Value keyNumZMQSockets("num_zmq_sockets", document.GetAllocator());
rapidjson::Value valueNumZMQSockets(config.num_zmq_sockets);
document.AddMember(keyNumZMQSockets, valueNumZMQSockets, document.GetAllocator());
// Add Number of 0MQ context threads
rapidjson::Value keyNumZMQContextThreads("num_zmq_context_threads", document.GetAllocator());
rapidjson::Value valueNumZMQContextThreads(config.num_zmq_context_threads);
document.AddMember(keyNumZMQContextThreads, valueNumZMQContextThreads, document.GetAllocator());

// Add Number of 0MQ threads
// Add Number of 0MQ consumers
rapidjson::Value keyNumConsumers("num_consumers", document.GetAllocator());
rapidjson::Value valueNumConsumers(config.num_consumers);
document.AddMember(keyNumConsumers, valueNumConsumers, document.GetAllocator());
Expand Down
16 changes: 8 additions & 8 deletions cpp/data/eigerfan/src/eigerfan_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ int parse_arguments(int argc, char** argv, EigerFanConfig &cfg)
("logconfig,l", po::value<std::string>(),
"Set the log4cxx logging configuration file")
("threads,t", po::value<unsigned int>()->default_value(EigerFanDefaults::DEFAULT_NUM_THREADS),
"Set the number of 0MQ threads for the 0MQ context")
"Set the number of threads to create to pull detector data")
("consumers,n", po::value<unsigned int>()->default_value(EigerFanDefaults::DEFAULT_NUM_CONSUMERS),
"Set the number of expected consumers")
("eigerport,e", po::value<std::string>()->default_value(EigerFanDefaults::DEFAULT_EIGER_PORT_NUMBER),
Expand All @@ -160,8 +160,8 @@ int parse_arguments(int argc, char** argv, EigerFanConfig &cfg)
"Set the port to accept control messages on")
("addr,s", po::value<std::string>()->default_value(EigerFanDefaults::DEFAULT_STREAM_ADDRESS),
"Set the address of the stream to connect to")
("sockets,z", po::value<unsigned int>()->default_value(EigerFanDefaults::DEFAULT_NUM_SOCKETS),
"Set the number of zmq sockets to connect to the Eiger with")
("context-threads,z", po::value<unsigned int>()->default_value(EigerFanDefaults::DEFAULT_NUM_CONTEXT_THREADS),
"Set the number of zmq context threads to connect to the Eiger with")
("blocksize,b", po::value<unsigned int>()->default_value(EigerFanDefaults::DEFAULT_BLOCK_SIZE),
"Set the block size being used by the downstream data file writers to")
;
Expand Down Expand Up @@ -227,8 +227,8 @@ int parse_arguments(int argc, char** argv, EigerFanConfig &cfg)

if (vm.count("threads"))
{
cfg.setNum0MQThreads(vm["threads"].as<unsigned int>());
LOG4CXX_DEBUG(logger, "Setting number of ZeroMQ threads to " << cfg.getNum0MQThreads());
cfg.setNumThreads(vm["threads"].as<unsigned int>());
LOG4CXX_DEBUG(logger, "Setting number of threads to " << cfg.getNumThreads());
}

if (vm.count("consumers"))
Expand Down Expand Up @@ -261,10 +261,10 @@ int parse_arguments(int argc, char** argv, EigerFanConfig &cfg)
LOG4CXX_DEBUG(logger, "Setting Eiger stream address to " << cfg.getEigerChannelAddress());
}

if (vm.count("sockets"))
if (vm.count("context-threads"))
{
cfg.setNum0MQSockets(vm["sockets"].as<unsigned int>());
LOG4CXX_DEBUG(logger, "Setting number of ZeroMQ sockets to " << cfg.getNum0MQSockets());
cfg.setNum0MQContexThreads(vm["context-threads"].as<unsigned int>());
LOG4CXX_DEBUG(logger, "Setting number of ZeroMQ context threads to " << cfg.getNum0MQContextThreads());
}

if (vm.count("blocksize"))
Expand Down

0 comments on commit 57da2a9

Please sign in to comment.