-
Notifications
You must be signed in to change notification settings - Fork 9.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
implement MSGQ to ZMQ bridge with subscriber-based publishing
- Loading branch information
Showing
4 changed files
with
212 additions
and
62 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
#include "cereal/messaging/msgq_to_zmq.h" | ||
|
||
#include <cassert> | ||
|
||
#include "common/util.h" | ||
|
||
extern ExitHandler do_exit; | ||
|
||
static std::string recv_zmq_msg(void *sock) { | ||
zmq_msg_t msg; | ||
zmq_msg_init(&msg); | ||
std::string ret; | ||
if (zmq_msg_recv(&msg, sock, 0) > 0) { | ||
ret.assign((char *)zmq_msg_data(&msg), zmq_msg_size(&msg)); | ||
} | ||
zmq_msg_close(&msg); | ||
return ret; | ||
} | ||
|
||
void MsgqToZmq::run(const std::vector<std::string> &endpoints, const std::string &ip) { | ||
zmq_context = std::make_unique<ZMQContext>(); | ||
msgq_context = std::make_unique<MSGQContext>(); | ||
|
||
// Create ZMQPubSockets for each endpoint | ||
for (auto endpoint : endpoints) { | ||
auto &socket_pair = socket_pairs.emplace_back(); | ||
socket_pair.endpoint = endpoint; | ||
socket_pair.pub_sock = std::make_unique<ZMQPubSocket>(); | ||
int ret = socket_pair.pub_sock->connect(zmq_context.get(), endpoint); | ||
if (ret != 0) { | ||
printf("Failed to create ZMQ publisher for [%s]: %s\n", endpoint.c_str(), zmq_strerror(zmq_errno())); | ||
return; | ||
} | ||
} | ||
|
||
// Start ZMQ monitoring thread to monitor socket events | ||
std::thread thread(&MsgqToZmq::zmqMonitorThread, this); | ||
|
||
while (!do_exit) { | ||
{ | ||
std::unique_lock lk(mutex); | ||
cv.wait(lk, [this]() { return do_exit || !sub2pub.empty(); }); | ||
if (do_exit) break; | ||
|
||
for (auto sub_sock : msgq_poller->poll(100)) { | ||
std::unique_ptr<Message> msg(sub_sock->receive(true)); | ||
while (msg && sub2pub[sub_sock]->sendMessage(msg.get()) == -1) { | ||
if (errno != EINTR) break; | ||
} | ||
} | ||
} | ||
util::sleep_for(1); // Give zmqMonitorThread a chance to acquire the mutex | ||
} | ||
|
||
thread.join(); | ||
} | ||
|
||
void MsgqToZmq::zmqMonitorThread() { | ||
std::vector<zmq_pollitem_t> pollitems; | ||
|
||
// Set up ZMQ monitor for each pub socket | ||
for (int i = 0; i < socket_pairs.size(); ++i) { | ||
std::string addr = "inproc://op-bridge-monitor-" + std::to_string(i); | ||
zmq_socket_monitor(socket_pairs[i].pub_sock->sock, addr.c_str(), ZMQ_EVENT_ACCEPTED | ZMQ_EVENT_DISCONNECTED); | ||
|
||
void *monitor_socket = zmq_socket(zmq_context->getRawContext(), ZMQ_PAIR); | ||
zmq_connect(monitor_socket, addr.c_str()); | ||
pollitems.emplace_back(zmq_pollitem_t{.socket = monitor_socket, .events = ZMQ_POLLIN}); | ||
} | ||
|
||
while (!do_exit) { | ||
int ret = zmq_poll(pollitems.data(), pollitems.size(), 1000); | ||
if (ret < 0) { | ||
if (errno == EINTR) { | ||
// Due to frequent EINTR signals from msgq, introduce a brief delay (200 ms) | ||
// to reduce CPU usage during retry attempts. | ||
util::sleep_for(200); | ||
} | ||
continue; | ||
} | ||
|
||
for (int i = 0; i < pollitems.size(); ++i) { | ||
if (pollitems[i].revents & ZMQ_POLLIN) { | ||
// First frame in message contains event number and value | ||
std::string frame = recv_zmq_msg(pollitems[i].socket); | ||
if (frame.empty()) continue; | ||
|
||
uint16_t event_type = *(uint16_t *)(frame.data()); | ||
|
||
// Second frame in message contains event address | ||
frame = recv_zmq_msg(pollitems[i].socket); | ||
if (frame.empty()) continue; | ||
|
||
std::unique_lock lk(mutex); | ||
auto &pair = socket_pairs[i]; | ||
if (event_type & ZMQ_EVENT_ACCEPTED) { | ||
printf("socket [%s] connected\n", pair.endpoint.c_str()); | ||
if (++pair.connected_clients == 1) { | ||
// Create new MSGQ subscriber socket and map to ZMQ publisher | ||
pair.sub_sock = std::make_unique<MSGQSubSocket>(); | ||
pair.sub_sock->connect(msgq_context.get(), pair.endpoint, "127.0.0.1"); | ||
sub2pub[pair.sub_sock.get()] = pair.pub_sock.get(); | ||
registerSockets(); | ||
} | ||
} else if (event_type & ZMQ_EVENT_DISCONNECTED) { | ||
printf("socket [%s] disconnected\n", pair.endpoint.c_str()); | ||
if (pair.connected_clients == 0 || --pair.connected_clients == 0) { | ||
// Remove MSGQ subscriber socket from mapping and reset it | ||
sub2pub.erase(pair.sub_sock.get()); | ||
pair.sub_sock.reset(nullptr); | ||
registerSockets(); | ||
} | ||
} | ||
cv.notify_one(); | ||
} | ||
} | ||
} | ||
|
||
// Clean up monitor sockets | ||
for (int i = 0; i < pollitems.size(); ++i) { | ||
zmq_socket_monitor(socket_pairs[i].pub_sock->sock, nullptr, 0); | ||
zmq_close(pollitems[i].socket); | ||
} | ||
cv.notify_one(); | ||
} | ||
|
||
void MsgqToZmq::registerSockets() { | ||
msgq_poller = std::make_unique<MSGQPoller>(); | ||
for (const auto &socket_pair : socket_pairs) { | ||
if (socket_pair.sub_sock) { | ||
msgq_poller->registerSocket(socket_pair.sub_sock.get()); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
#pragma once | ||
|
||
#include <condition_variable> | ||
#include <map> | ||
#include <memory> | ||
#include <mutex> | ||
#include <string> | ||
#include <vector> | ||
|
||
#define private public | ||
#include "msgq/impl_msgq.h" | ||
#include "msgq/impl_zmq.h" | ||
|
||
class MsgqToZmq { | ||
public: | ||
MsgqToZmq() {} | ||
void run(const std::vector<std::string> &endpoints, const std::string &ip); | ||
|
||
protected: | ||
void registerSockets(); | ||
void zmqMonitorThread(); | ||
|
||
struct SocketPair { | ||
std::string endpoint; | ||
std::unique_ptr<ZMQPubSocket> pub_sock; | ||
std::unique_ptr<MSGQSubSocket> sub_sock; | ||
int connected_clients = 0; | ||
}; | ||
|
||
std::unique_ptr<MSGQContext> msgq_context; | ||
std::unique_ptr<ZMQContext> zmq_context; | ||
std::mutex mutex; | ||
std::condition_variable cv; | ||
std::unique_ptr<MSGQPoller> msgq_poller; | ||
std::map<SubSocket *, ZMQPubSocket *> sub2pub; | ||
std::vector<SocketPair> socket_pairs; | ||
}; |