From 1ae176980e6833ebd81531d550b8fd8f7410217d Mon Sep 17 00:00:00 2001 From: deanlee Date: Fri, 28 Jun 2024 02:36:38 +0800 Subject: [PATCH] implement MSGQ to ZMQ bridge with subscriber-based publishing --- cereal/SConscript | 2 +- cereal/messaging/bridge.cc | 101 ++++++++++-------------- cereal/messaging/msgq_to_zmq.cc | 134 ++++++++++++++++++++++++++++++++ cereal/messaging/msgq_to_zmq.h | 37 +++++++++ 4 files changed, 212 insertions(+), 62 deletions(-) create mode 100644 cereal/messaging/msgq_to_zmq.cc create mode 100644 cereal/messaging/msgq_to_zmq.h diff --git a/cereal/SConscript b/cereal/SConscript index be5f161dea03a4..fccc48f814f4bb 100644 --- a/cereal/SConscript +++ b/cereal/SConscript @@ -22,7 +22,7 @@ env.SharedLibrary('cereal_shared', cereal_objects) # Build messaging services_h = env.Command(['services.h'], ['services.py'], 'python3 ' + cereal_dir.path + '/services.py > $TARGET') -env.Program('messaging/bridge', ['messaging/bridge.cc'], LIBS=[msgq, 'zmq', common]) +env.Program('messaging/bridge', ['messaging/bridge.cc', 'messaging/msgq_to_zmq.cc'], LIBS=[msgq, 'zmq', common, 'pthread']) socketmaster = env.SharedObject(['messaging/socketmaster.cc']) diff --git a/cereal/messaging/bridge.cc b/cereal/messaging/bridge.cc index 8619c1e22640bb..93af988358df85 100644 --- a/cereal/messaging/bridge.cc +++ b/cereal/messaging/bridge.cc @@ -1,25 +1,10 @@ -#include #include -#include -#include -#include -#include - -typedef void (*sighandler_t)(int sig); +#include "cereal/messaging/msgq_to_zmq.h" #include "cereal/services.h" -#include "msgq/impl_msgq.h" -#include "msgq/impl_zmq.h" +#include "common/util.h" -std::atomic do_exit = false; -static void set_do_exit(int sig) { - do_exit = true; -} - -void sigpipe_handler(int sig) { - assert(sig == SIGPIPE); - std::cout << "SIGPIPE received" << std::endl; -} +ExitHandler do_exit; static std::vector get_services(std::string whitelist_str, bool zmq_to_msgq) { std::vector service_list; @@ -34,41 +19,22 @@ static std::vector get_services(std::string whitelist_str, bool zmq return service_list; } -int main(int argc, char** argv) { - signal(SIGPIPE, (sighandler_t)sigpipe_handler); - signal(SIGINT, (sighandler_t)set_do_exit); - signal(SIGTERM, (sighandler_t)set_do_exit); - - bool zmq_to_msgq = argc > 2; - std::string ip = zmq_to_msgq ? argv[1] : "127.0.0.1"; - std::string whitelist_str = zmq_to_msgq ? std::string(argv[2]) : ""; +void msgq_to_zmq(const std::vector &endpoints, const std::string &ip) { + MsgqToZmq bridge; + bridge.run(endpoints, ip); +} - Poller *poller; - Context *pub_context; - Context *sub_context; - if (zmq_to_msgq) { // republishes zmq debugging messages as msgq - poller = new ZMQPoller(); - pub_context = new MSGQContext(); - sub_context = new ZMQContext(); - } else { - poller = new MSGQPoller(); - pub_context = new ZMQContext(); - sub_context = new MSGQContext(); - } +void zmq_to_msgq(const std::vector &endpoints, const std::string &ip) { + auto poller = std::make_unique(); + auto pub_context = std::make_unique(); + auto sub_context = std::make_unique(); + std::map sub2pub; - std::map sub2pub; - for (auto endpoint : get_services(whitelist_str, zmq_to_msgq)) { - PubSocket * pub_sock; - SubSocket * sub_sock; - if (zmq_to_msgq) { - pub_sock = new MSGQPubSocket(); - sub_sock = new ZMQSubSocket(); - } else { - pub_sock = new ZMQPubSocket(); - sub_sock = new MSGQSubSocket(); - } - pub_sock->connect(pub_context, endpoint); - sub_sock->connect(sub_context, endpoint, ip, false); + for (auto endpoint : endpoints) { + auto pub_sock = new MSGQPubSocket(); + auto sub_sock = new ZMQSubSocket(); + pub_sock->connect(pub_context.get(), endpoint); + sub_sock->connect(sub_context.get(), endpoint, ip, false); poller->registerSocket(sub_sock); sub2pub[sub_sock] = pub_sock; @@ -76,17 +42,30 @@ int main(int argc, char** argv) { while (!do_exit) { for (auto sub_sock : poller->poll(100)) { - Message * msg = sub_sock->receive(); - if (msg == NULL) continue; - int ret; - do { - ret = sub2pub[sub_sock]->sendMessage(msg); - } while (ret == -1 && errno == EINTR && !do_exit); - assert(ret >= 0 || do_exit); - delete msg; - - if (do_exit) break; + std::unique_ptr msg(sub_sock->receive(true)); + if (msg) { + sub2pub[sub_sock]->sendMessage(msg.get()); + } } } + + // Clean up allocated sockets + for (auto &[sub_sock, pub_sock] : sub2pub) { + delete sub_sock; + delete pub_sock; + } +} + +int main(int argc, char **argv) { + bool is_zmq_to_msgq = argc > 2; + std::string ip = is_zmq_to_msgq ? argv[1] : "127.0.0.1"; + std::string whitelist_str = is_zmq_to_msgq ? std::string(argv[2]) : ""; + std::vector endpoints = get_services(whitelist_str, is_zmq_to_msgq); + + if (is_zmq_to_msgq) { + zmq_to_msgq(endpoints, ip); + } else { + msgq_to_zmq(endpoints, ip); + } return 0; } diff --git a/cereal/messaging/msgq_to_zmq.cc b/cereal/messaging/msgq_to_zmq.cc new file mode 100644 index 00000000000000..87ce02ac4d3a32 --- /dev/null +++ b/cereal/messaging/msgq_to_zmq.cc @@ -0,0 +1,134 @@ +#include "cereal/messaging/msgq_to_zmq.h" + +#include + +#include "common/util.h" + +extern ExitHandler do_exit; + +static std::string recv_zmq_msg(void *sock) { + zmq_msg_t msg; + zmq_msg_init(&msg); + std::string ret; + if (zmq_msg_recv(&msg, sock, 0) > 0) { + ret.assign((char *)zmq_msg_data(&msg), zmq_msg_size(&msg)); + } + zmq_msg_close(&msg); + return ret; +} + +void MsgqToZmq::run(const std::vector &endpoints, const std::string &ip) { + zmq_context = std::make_unique(); + msgq_context = std::make_unique(); + + // Create ZMQPubSockets for each endpoint + for (auto endpoint : endpoints) { + auto &socket_pair = socket_pairs.emplace_back(); + socket_pair.endpoint = endpoint; + socket_pair.pub_sock = std::make_unique(); + int ret = socket_pair.pub_sock->connect(zmq_context.get(), endpoint); + if (ret != 0) { + printf("Failed to create ZMQ publisher for [%s]: %s\n", endpoint.c_str(), zmq_strerror(zmq_errno())); + return; + } + } + + // Start ZMQ monitoring thread to monitor socket events + std::thread thread(&MsgqToZmq::zmqMonitorThread, this); + + while (!do_exit) { + { + std::unique_lock lk(mutex); + cv.wait(lk, [this]() { return do_exit || !sub2pub.empty(); }); + if (do_exit) break; + + for (auto sub_sock : msgq_poller->poll(100)) { + std::unique_ptr msg(sub_sock->receive(true)); + while (msg && sub2pub[sub_sock]->sendMessage(msg.get()) == -1) { + if (errno != EINTR) break; + } + } + } + util::sleep_for(1); // Give zmqMonitorThread a chance to acquire the mutex + } + + thread.join(); +} + +void MsgqToZmq::zmqMonitorThread() { + std::vector pollitems; + + // Set up ZMQ monitor for each pub socket + for (int i = 0; i < socket_pairs.size(); ++i) { + std::string addr = "inproc://op-bridge-monitor-" + std::to_string(i); + zmq_socket_monitor(socket_pairs[i].pub_sock->sock, addr.c_str(), ZMQ_EVENT_ACCEPTED | ZMQ_EVENT_DISCONNECTED); + + void *monitor_socket = zmq_socket(zmq_context->getRawContext(), ZMQ_PAIR); + zmq_connect(monitor_socket, addr.c_str()); + pollitems.emplace_back(zmq_pollitem_t{.socket = monitor_socket, .events = ZMQ_POLLIN}); + } + + while (!do_exit) { + int ret = zmq_poll(pollitems.data(), pollitems.size(), 1000); + if (ret < 0) { + if (errno == EINTR) { + // Due to frequent EINTR signals from msgq, introduce a brief delay (200 ms) + // to reduce CPU usage during retry attempts. + util::sleep_for(200); + } + continue; + } + + for (int i = 0; i < pollitems.size(); ++i) { + if (pollitems[i].revents & ZMQ_POLLIN) { + // First frame in message contains event number and value + std::string frame = recv_zmq_msg(pollitems[i].socket); + if (frame.empty()) continue; + + uint16_t event_type = *(uint16_t *)(frame.data()); + + // Second frame in message contains event address + frame = recv_zmq_msg(pollitems[i].socket); + if (frame.empty()) continue; + + std::unique_lock lk(mutex); + auto &pair = socket_pairs[i]; + if (event_type & ZMQ_EVENT_ACCEPTED) { + printf("socket [%s] connected\n", pair.endpoint.c_str()); + if (++pair.connected_clients == 1) { + // Create new MSGQ subscriber socket and map to ZMQ publisher + pair.sub_sock = std::make_unique(); + pair.sub_sock->connect(msgq_context.get(), pair.endpoint, "127.0.0.1"); + sub2pub[pair.sub_sock.get()] = pair.pub_sock.get(); + registerSockets(); + } + } else if (event_type & ZMQ_EVENT_DISCONNECTED) { + printf("socket [%s] disconnected\n", pair.endpoint.c_str()); + if (pair.connected_clients == 0 || --pair.connected_clients == 0) { + // Remove MSGQ subscriber socket from mapping and reset it + sub2pub.erase(pair.sub_sock.get()); + pair.sub_sock.reset(nullptr); + registerSockets(); + } + } + cv.notify_one(); + } + } + } + + // Clean up monitor sockets + for (int i = 0; i < pollitems.size(); ++i) { + zmq_socket_monitor(socket_pairs[i].pub_sock->sock, nullptr, 0); + zmq_close(pollitems[i].socket); + } + cv.notify_one(); +} + +void MsgqToZmq::registerSockets() { + msgq_poller = std::make_unique(); + for (const auto &socket_pair : socket_pairs) { + if (socket_pair.sub_sock) { + msgq_poller->registerSocket(socket_pair.sub_sock.get()); + } + } +} diff --git a/cereal/messaging/msgq_to_zmq.h b/cereal/messaging/msgq_to_zmq.h new file mode 100644 index 00000000000000..ebdbe5df690e98 --- /dev/null +++ b/cereal/messaging/msgq_to_zmq.h @@ -0,0 +1,37 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#define private public +#include "msgq/impl_msgq.h" +#include "msgq/impl_zmq.h" + +class MsgqToZmq { +public: + MsgqToZmq() {} + void run(const std::vector &endpoints, const std::string &ip); + +protected: + void registerSockets(); + void zmqMonitorThread(); + + struct SocketPair { + std::string endpoint; + std::unique_ptr pub_sock; + std::unique_ptr sub_sock; + int connected_clients = 0; + }; + + std::unique_ptr msgq_context; + std::unique_ptr zmq_context; + std::mutex mutex; + std::condition_variable cv; + std::unique_ptr msgq_poller; + std::map sub2pub; + std::vector socket_pairs; +};