Skip to content

Commit

Permalink
Merge pull request #21 from DiamondLightSource/multithreaded-fan
Browse files Browse the repository at this point in the history
Merge multithreaded-fan into master
  • Loading branch information
JamesOHeaDLS authored Sep 26, 2024
2 parents f8eacbf + 8556684 commit 0af2633
Show file tree
Hide file tree
Showing 7 changed files with 300 additions and 84 deletions.
4 changes: 3 additions & 1 deletion cpp/data/common/include/EigerDefinitions.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ namespace Eiger {

// EigerFan related constants
const int MORE_MESSAGES = 1;
const int RECEIVE_HWM = 100000;
const int RECEIVE_HWM = 100000; // High water marks for the main receiver thread
const int SEND_HWM = 100000;
const int WORKER_HWM = 10000; // A lower high water mark for the worker threads
const int LINGER_TIMEOUT = 100; // Socket linger timeout in milliseconds

const std::string CONTROL_CMD_KEY = "msg_val";
const std::string CONTROL_ID_KEY = "id";
Expand Down
23 changes: 16 additions & 7 deletions cpp/data/eigerfan/include/EigerFan.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,20 @@
#define RAPIDJSON_HAS_STDSTRING 1

#include <vector>
#include "EigerFanConfig.h"
#include "EigerDefinitions.h"
#include "zmq/zmq.hpp"

#include <log4cxx/logger.h>
#include "rapidjson/document.h"
#include "rapidjson/writer.h"
#include "rapidjson/stringbuffer.h"
#include <log4cxx/logger.h>
#include "rapidjson/writer.h"
#include "zmq/zmq.hpp"

#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>

#include "EigerFanConfig.h"
#include "EigerDefinitions.h"
#include "MultiPullBroker.h"


class EigerFan {

Expand All @@ -26,13 +32,14 @@ class EigerFan {
} EigerConsumer;

public:

EigerFan();
EigerFan(EigerFanConfig config_);
virtual ~EigerFan();
void run();
void HandleRxSocket(std::string& endpoint, int num_zmq_context_threads);
void Stop();
void SetNumberOfConsumers(int number);

protected:
void HandleStreamMessage(zmq::message_t &message, boost::shared_ptr<zmq::socket_t> socket);
void HandleGlobalHeaderMessage(boost::shared_ptr<zmq::socket_t> socket);
Expand All @@ -42,6 +49,7 @@ class EigerFan {
void HandleMonitorMessage(zmq::message_t &message, boost::shared_ptr<zmq::socket_t> socket, int rank);
void HandleForwardMonitorMessage(zmq::message_t &message, zmq::socket_t &socket);
void HandleControlMessage(zmq::message_t &message, zmq::message_t &idMessage);

void SendMessageToAllConsumers(zmq::message_t &message, int flags = 0);
void SendMessagesToAllConsumers(std::vector<zmq::message_t*> &messageLista);
void SendMessageToSingleConsumer(zmq::message_t &message, int flags = 0);
Expand All @@ -57,6 +65,8 @@ class EigerFan {
zmq::context_t ctx_;
zmq::socket_t controlSocket;
zmq::socket_t forwardSocket;
MultiPullBroker broker;
boost::shared_ptr<boost::thread> rx_thread_;
std::vector<EigerConsumer> consumers;

bool killRequested;
Expand All @@ -75,5 +85,4 @@ class EigerFan {
bool devShmCache;
};


#endif //EIGERDAQ_EIGERFAN_H
26 changes: 13 additions & 13 deletions cpp/data/eigerfan/include/EigerFanConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace EigerFanDefaults {
const std::string DEFAULT_STREAM_ADDRESS = "localhost";
const std::string DEFAULT_EIGER_PORT_NUMBER = Eiger::STREAM_PORT_NUMBER;
const int DEFAULT_FAN_PORT_NUMBER_START = 31600;
const int DEFAULT_NUM_SOCKETS = 1;
const int DEFAULT_NUM_CONTEXT_THREADS = 1;
const int DEFAULT_BLOCK_SIZE = 1;
const std::string DEFAULT_FORWARD_PORT_NUMBER = "9009";
}
Expand All @@ -27,14 +27,14 @@ class EigerFanConfig
public:

EigerFanConfig() :
num_zmq_threads(EigerFanDefaults::DEFAULT_NUM_THREADS),
num_threads(EigerFanDefaults::DEFAULT_NUM_THREADS),
num_consumers(EigerFanDefaults::DEFAULT_NUM_CONSUMERS),
ctrl_channel_port(EigerFanDefaults::DEFAULT_CONTROL_PORT_NUMBER),
eiger_channel_address(EigerFanDefaults::DEFAULT_STREAM_ADDRESS),
eiger_channel_port(EigerFanDefaults::DEFAULT_EIGER_PORT_NUMBER),
forward_channel_port(EigerFanDefaults::DEFAULT_FORWARD_PORT_NUMBER),
fan_channel_port_start(EigerFanDefaults::DEFAULT_FAN_PORT_NUMBER_START),
num_zmq_sockets(EigerFanDefaults::DEFAULT_NUM_SOCKETS),
num_zmq_context_threads(EigerFanDefaults::DEFAULT_NUM_CONTEXT_THREADS),
block_size(EigerFanDefaults::DEFAULT_BLOCK_SIZE)
{
};
Expand All @@ -51,16 +51,16 @@ class EigerFanConfig
num_consumers = numConsumers;
}

void setNum0MQThreads(int numZmqThreads) {
num_zmq_threads = numZmqThreads;
void setNumThreads(int numThreads) {
num_threads = numThreads;
}

void setFanChannelPortStart(int fanChannelPortStart) {
fan_channel_port_start = fanChannelPortStart;
}

void setNum0MQSockets(int numZmqSockets) {
num_zmq_sockets = numZmqSockets;
void setNum0MQContextThreads(int numZmqContextThreads) {
num_zmq_context_threads = numZmqContextThreads;
}

void setBlockSize(int blockSize) {
Expand All @@ -83,16 +83,16 @@ class EigerFanConfig
return num_consumers;
}

int getNum0MQThreads() const {
return num_zmq_threads;
int getNumThreads() const {
return num_threads;
}

int getFanChannelPortStart() const {
return fan_channel_port_start;
}

int getNum0MQSockets() const {
return num_zmq_sockets;
int getNum0MQContextThreads() const {
return num_zmq_context_threads;
}

int getBlockSize() const {
Expand All @@ -105,14 +105,14 @@ class EigerFanConfig

private:

int num_zmq_threads; // Number of 0MQ threads
int num_threads; // Number of 0MQ threads
int num_consumers; // Expected number of consumers
std::string ctrl_channel_port; // Port to bind to for the control channel
std::string eiger_channel_address; // Address to connect to for the Eiger Stream
std::string eiger_channel_port; // Port to connect to for the Eiger Stream
std::string forward_channel_port; // Port to bind to for the forwarding channel
int fan_channel_port_start; // Port to bind to for the fan channel
int num_zmq_sockets; // Number of 0MQ sockets
int num_zmq_context_threads; // Number of 0MQ context threads
int block_size; // Block Size being used by the downstream data file writers

friend class EigerFan;
Expand Down
50 changes: 50 additions & 0 deletions cpp/data/eigerfan/include/MultiPullBroker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Created on: 18/08/2023
* Author: Gary Yendell
*/

#ifndef MULTIPULLBROKER_H
#define MULTIPULLBROKER_H


#include <atomic>
#include <vector>

#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>

#include <log4cxx/logger.h>
#include "zmq/zmq.hpp"

class MultiPullBroker {

public:
MultiPullBroker(
std::string& sink_endpoint,
int thread_count
);
~MultiPullBroker();

void connect(std::string& endpoint, void* inproc_context);
void start_message_counter();
uint64_t messages_received();
void shutdown();

protected:

private:
log4cxx::LoggerPtr logger_;

std::vector<boost::shared_ptr<boost::thread> > worker_threads_;
std::string source_endpoint_;
std::string sink_endpoint_;
zmq::context_t* inproc_context_;
int thread_count_;
std::atomic<std::uint64_t> messages_received_;
bool shutdown_requested_;

void worker_loop(std::string& endpoint);

};

#endif // MULTIPULLBROKER_H
Loading

0 comments on commit 0af2633

Please sign in to comment.