Skip to content

Commit

Permalink
implement MSGQ to ZMQ bridge with subscriber-based publishing
Browse files Browse the repository at this point in the history
  • Loading branch information
deanlee committed Jul 8, 2024
1 parent 241fb90 commit 0f51583
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 62 deletions.
2 changes: 1 addition & 1 deletion cereal/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ env.SharedLibrary('cereal_shared', cereal_objects)
# Build messaging

services_h = env.Command(['services.h'], ['services.py'], 'python3 ' + cereal_dir.path + '/services.py > $TARGET')
env.Program('messaging/bridge', ['messaging/bridge.cc'], LIBS=[msgq, 'zmq', common])
env.Program('messaging/bridge', ['messaging/bridge.cc', 'messaging/msgq_to_zmq.cc'], LIBS=[msgq, 'zmq', common, 'pthread'])


socketmaster = env.SharedObject(['messaging/socketmaster.cc'])
Expand Down
101 changes: 40 additions & 61 deletions cereal/messaging/bridge.cc
Original file line number Diff line number Diff line change
@@ -1,25 +1,10 @@
#include <algorithm>
#include <cassert>
#include <csignal>
#include <iostream>
#include <map>
#include <string>

typedef void (*sighandler_t)(int sig);

#include "cereal/messaging/msgq_to_zmq.h"
#include "cereal/services.h"
#include "msgq/impl_msgq.h"
#include "msgq/impl_zmq.h"
#include "common/util.h"

std::atomic<bool> do_exit = false;
static void set_do_exit(int sig) {
do_exit = true;
}

void sigpipe_handler(int sig) {
assert(sig == SIGPIPE);
std::cout << "SIGPIPE received" << std::endl;
}
ExitHandler do_exit;

static std::vector<std::string> get_services(std::string whitelist_str, bool zmq_to_msgq) {
std::vector<std::string> service_list;
Expand All @@ -34,59 +19,53 @@ static std::vector<std::string> get_services(std::string whitelist_str, bool zmq
return service_list;
}

int main(int argc, char** argv) {
signal(SIGPIPE, (sighandler_t)sigpipe_handler);
signal(SIGINT, (sighandler_t)set_do_exit);
signal(SIGTERM, (sighandler_t)set_do_exit);

bool zmq_to_msgq = argc > 2;
std::string ip = zmq_to_msgq ? argv[1] : "127.0.0.1";
std::string whitelist_str = zmq_to_msgq ? std::string(argv[2]) : "";
void msgq_to_zmq(const std::vector<std::string> &endpoints, const std::string &ip) {
MsgqToZmq bridge;
bridge.run(endpoints, ip);
}

Poller *poller;
Context *pub_context;
Context *sub_context;
if (zmq_to_msgq) { // republishes zmq debugging messages as msgq
poller = new ZMQPoller();
pub_context = new MSGQContext();
sub_context = new ZMQContext();
} else {
poller = new MSGQPoller();
pub_context = new ZMQContext();
sub_context = new MSGQContext();
}
void zmq_to_msgq(const std::vector<std::string> &endpoints, const std::string &ip) {
auto poller = std::make_unique<ZMQPoller>();
auto pub_context = std::make_unique<MSGQContext>();
auto sub_context = std::make_unique<ZMQContext>();
std::map<SubSocket *, PubSocket *> sub2pub;

std::map<SubSocket*, PubSocket*> sub2pub;
for (auto endpoint : get_services(whitelist_str, zmq_to_msgq)) {
PubSocket * pub_sock;
SubSocket * sub_sock;
if (zmq_to_msgq) {
pub_sock = new MSGQPubSocket();
sub_sock = new ZMQSubSocket();
} else {
pub_sock = new ZMQPubSocket();
sub_sock = new MSGQSubSocket();
}
pub_sock->connect(pub_context, endpoint);
sub_sock->connect(sub_context, endpoint, ip, false);
for (auto endpoint : endpoints) {
auto pub_sock = new MSGQPubSocket();
auto sub_sock = new ZMQSubSocket();
pub_sock->connect(pub_context.get(), endpoint);
sub_sock->connect(sub_context.get(), endpoint, ip, false);

poller->registerSocket(sub_sock);
sub2pub[sub_sock] = pub_sock;
}

while (!do_exit) {
for (auto sub_sock : poller->poll(100)) {
Message * msg = sub_sock->receive();
if (msg == NULL) continue;
int ret;
do {
ret = sub2pub[sub_sock]->sendMessage(msg);
} while (ret == -1 && errno == EINTR && !do_exit);
assert(ret >= 0 || do_exit);
delete msg;

if (do_exit) break;
std::unique_ptr<Message> msg(sub_sock->receive(true));
if (msg) {
sub2pub[sub_sock]->sendMessage(msg.get());
}
}
}

// Clean up allocated sockets
for (auto &[sub_sock, pub_sock] : sub2pub) {
delete sub_sock;
delete pub_sock;
}
}

int main(int argc, char **argv) {
bool is_zmq_to_msgq = argc > 2;
std::string ip = is_zmq_to_msgq ? argv[1] : "127.0.0.1";
std::string whitelist_str = is_zmq_to_msgq ? std::string(argv[2]) : "";
std::vector<std::string> endpoints = get_services(whitelist_str, is_zmq_to_msgq);

if (is_zmq_to_msgq) {
zmq_to_msgq(endpoints, ip);
} else {
msgq_to_zmq(endpoints, ip);
}
return 0;
}
134 changes: 134 additions & 0 deletions cereal/messaging/msgq_to_zmq.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#include "cereal/messaging/msgq_to_zmq.h"

#include <cassert>

#include "common/util.h"

extern ExitHandler do_exit;

static std::string recv_zmq_msg(void *sock) {
zmq_msg_t msg;
zmq_msg_init(&msg);
std::string ret;
if (zmq_msg_recv(&msg, sock, 0) > 0) {
ret.assign((char *)zmq_msg_data(&msg), zmq_msg_size(&msg));
}
zmq_msg_close(&msg);
return ret;
}

void MsgqToZmq::run(const std::vector<std::string> &endpoints, const std::string &ip) {
zmq_context = std::make_unique<ZMQContext>();
msgq_context = std::make_unique<MSGQContext>();

// Create ZMQPubSockets for each endpoint
for (auto endpoint : endpoints) {
auto &socket_pair = socket_pairs.emplace_back();
socket_pair.endpoint = endpoint;
socket_pair.pub_sock = std::make_unique<ZMQPubSocket>();
int ret = socket_pair.pub_sock->connect(zmq_context.get(), endpoint);
if (ret != 0) {
printf("Failed to create ZMQ publisher for [%s]: %s\n", endpoint.c_str(), zmq_strerror(zmq_errno()));
return;
}
}

// Start ZMQ monitoring thread to monitor socket events
std::thread thread(&MsgqToZmq::zmqMonitorThread, this);

while (!do_exit) {
{
std::unique_lock lk(mutex);
cv.wait(lk, [this]() { return do_exit || !sub2pub.empty(); });
if (do_exit) break;

for (auto sub_sock : msgq_poller->poll(100)) {
std::unique_ptr<Message> msg(sub_sock->receive(true));
while (msg && sub2pub[sub_sock]->sendMessage(msg.get()) == -1) {
if (errno != EINTR) break;
}
}
}
util::sleep_for(1); // Give zmqMonitorThread a chance to acquire the mutex
}

thread.join();
}

void MsgqToZmq::zmqMonitorThread() {
std::vector<zmq_pollitem_t> pollitems;

// Set up ZMQ monitor for each pub socket
for (int i = 0; i < socket_pairs.size(); ++i) {
std::string addr = "inproc://op-bridge-monitor-" + std::to_string(i);
zmq_socket_monitor(socket_pairs[i].pub_sock->sock, addr.c_str(), ZMQ_EVENT_ACCEPTED | ZMQ_EVENT_DISCONNECTED);

void *monitor_socket = zmq_socket(zmq_context->getRawContext(), ZMQ_PAIR);
zmq_connect(monitor_socket, addr.c_str());
pollitems.emplace_back(zmq_pollitem_t{.socket = monitor_socket, .events = ZMQ_POLLIN});
}

while (!do_exit) {
int ret = zmq_poll(pollitems.data(), pollitems.size(), 1000);
if (ret < 0) {
if (errno == EINTR) {
// Due to frequent EINTR signals from msgq, introduce a brief delay (200 ms)
// to reduce CPU usage during retry attempts.
util::sleep_for(200);
}
continue;
}

for (int i = 0; i < pollitems.size(); ++i) {
if (pollitems[i].revents & ZMQ_POLLIN) {
// First frame in message contains event number and value
std::string frame = recv_zmq_msg(pollitems[i].socket);
if (frame.empty()) continue;

uint16_t event_type = *(uint16_t *)(frame.data());

// Second frame in message contains event address
frame = recv_zmq_msg(pollitems[i].socket);
if (frame.empty()) continue;

std::unique_lock lk(mutex);
auto &pair = socket_pairs[i];
if (event_type & ZMQ_EVENT_ACCEPTED) {
printf("socket [%s] connected\n", pair.endpoint.c_str());
if (++pair.connected_clients == 1) {
// Create new MSGQ subscriber socket and map to ZMQ publisher
pair.sub_sock = std::make_unique<MSGQSubSocket>();
pair.sub_sock->connect(msgq_context.get(), pair.endpoint, "127.0.0.1");
sub2pub[pair.sub_sock.get()] = pair.pub_sock.get();
registerSockets();
}
} else if (event_type & ZMQ_EVENT_DISCONNECTED) {
printf("socket [%s] disconnected\n", pair.endpoint.c_str());
if (pair.connected_clients == 0 || --pair.connected_clients == 0) {
// Remove MSGQ subscriber socket from mapping and reset it
sub2pub.erase(pair.sub_sock.get());
pair.sub_sock.reset(nullptr);
registerSockets();
}
}
cv.notify_one();
}
}
}

// Clean up monitor sockets
for (int i = 0; i < pollitems.size(); ++i) {
zmq_socket_monitor(socket_pairs[i].pub_sock->sock, nullptr, 0);
zmq_close(pollitems[i].socket);
}
cv.notify_one();
}

void MsgqToZmq::registerSockets() {
msgq_poller = std::make_unique<MSGQPoller>();
for (const auto &socket_pair : socket_pairs) {
if (socket_pair.sub_sock) {
msgq_poller->registerSocket(socket_pair.sub_sock.get());
}
}
}
37 changes: 37 additions & 0 deletions cereal/messaging/msgq_to_zmq.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#pragma once

#include <condition_variable>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

#define private public
#include "msgq/impl_msgq.h"
#include "msgq/impl_zmq.h"

class MsgqToZmq {
public:
MsgqToZmq() {}
void run(const std::vector<std::string> &endpoints, const std::string &ip);

protected:
void registerSockets();
void zmqMonitorThread();

struct SocketPair {
std::string endpoint;
std::unique_ptr<ZMQPubSocket> pub_sock;
std::unique_ptr<MSGQSubSocket> sub_sock;
int connected_clients = 0;
};

std::unique_ptr<MSGQContext> msgq_context;
std::unique_ptr<ZMQContext> zmq_context;
std::mutex mutex;
std::condition_variable cv;
std::unique_ptr<MSGQPoller> msgq_poller;
std::map<SubSocket *, ZMQPubSocket *> sub2pub;
std::vector<SocketPair> socket_pairs;
};

0 comments on commit 0f51583

Please sign in to comment.