From 4cf55c436c57cff9632298f987b4b18c0d2b2a09 Mon Sep 17 00:00:00 2001 From: James O'Hea Date: Fri, 9 Aug 2024 09:15:57 +0000 Subject: [PATCH] Give HandleRxSocket an extra argument so can use context_threads --- cpp/data/eigerfan/include/EigerFan.h | 2 +- cpp/data/eigerfan/src/EigerFan.cpp | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/data/eigerfan/include/EigerFan.h b/cpp/data/eigerfan/include/EigerFan.h index 3fda29b..2fbf66e 100644 --- a/cpp/data/eigerfan/include/EigerFan.h +++ b/cpp/data/eigerfan/include/EigerFan.h @@ -36,7 +36,7 @@ class EigerFan { EigerFan(EigerFanConfig config_); virtual ~EigerFan(); void run(); - void HandleRxSocket(std::string& endpoint); + void HandleRxSocket(std::string& endpoint, int num_zmq_context_threads); void Stop(); void SetNumberOfConsumers(int number); diff --git a/cpp/data/eigerfan/src/EigerFan.cpp b/cpp/data/eigerfan/src/EigerFan.cpp index f52b8bc..1f4ca6c 100644 --- a/cpp/data/eigerfan/src/EigerFan.cpp +++ b/cpp/data/eigerfan/src/EigerFan.cpp @@ -308,7 +308,7 @@ void EigerFan::run() { // Spawn rx thread LOG4CXX_INFO(log, "Spawning rx thread"); this->rx_thread_ = boost::shared_ptr( - new boost::thread(boost::bind(&EigerFan::HandleRxSocket, this, streamConnectionAddress)) + new boost::thread(boost::bind(&EigerFan::HandleRxSocket, this, streamConnectionAddress, config.num_zmq_context_threads)) ); while (state != WAITING_STREAM) { @@ -360,8 +360,8 @@ void EigerFan::run() { /** * Connect broker to detector and handle the messages it produces */ -void EigerFan::HandleRxSocket(std::string& endpoint) { - zmq::context_t inproc_context(8); +void EigerFan::HandleRxSocket(std::string& endpoint, int num_zmq_context_threads) { + zmq::context_t inproc_context(num_zmq_context_threads); zmq::socket_t rx_socket(inproc_context, ZMQ_PULL); rx_socket.setsockopt(ZMQ_RCVHWM, &RECEIVE_HWM, sizeof(RECEIVE_HWM)); rx_socket.bind(BROKER_INPROC_ENDPOINT.c_str());