diff --git a/selfdrive/pandad/pandad.cc b/selfdrive/pandad/pandad.cc index 9e72ac31ff7693..308a6be40e6a11 100644 --- a/selfdrive/pandad/pandad.cc +++ b/selfdrive/pandad/pandad.cc @@ -164,24 +164,25 @@ Panda *connect(std::string serial="", uint32_t index=0) { return panda.release(); } -void can_send(std::vector &pandas, bool fake_send) { - static AlignedBuffer aligned_buf; - static std::unique_ptr context(Context::create()); - static std::unique_ptr subscriber(SubSocket::create(context.get(), "sendcan")); +void can_send_thread(std::vector pandas, bool fake_send) { + util::set_thread_name("pandad_can_send"); - std::vector> messages; - - // Non-blocking drain of sendcan socket - while (auto msg = subscriber->receive(true)) { - messages.emplace_back(msg); - } + AlignedBuffer aligned_buf; + std::unique_ptr context(Context::create()); + std::unique_ptr subscriber(SubSocket::create(context.get(), "sendcan")); + assert(subscriber != NULL); + subscriber->setTimeout(100); - if (do_exit || !check_all_connected(pandas)) { - return; - } + // run as fast as messages come in + while (!do_exit && check_all_connected(pandas)) { + std::unique_ptr msg(subscriber->receive()); + if (!msg) { + if (errno == EINTR) { + do_exit = true; + } + continue; + } - // Process and send each message to pandas - for (const auto &msg : messages) { capnp::FlatArrayMessageReader cmsg(aligned_buf.align(msg.get())); cereal::Event::Reader event = cmsg.getRoot(); @@ -518,14 +519,17 @@ void process_peripheral_state(Panda *panda, PubMaster *pm, bool no_fan_control) void pandad_run(std::vector &pandas) { const bool no_fan_control = getenv("NO_FAN_CONTROL") != nullptr; const bool spoofing_started = getenv("STARTED") != nullptr; - const bool fack_send = getenv("FAKESEND") != nullptr; + const bool fake_send = getenv("FAKESEND") != nullptr; PubMaster pm({"can", "pandaStates", "peripheralState"}); RateKeeper rk("pandad", 100); // 100 hz + // Start the CAN send thread + std::thread send_thread(can_send_thread, pandas, fake_send); + + // Main loop: receive CAN data and process states while (!do_exit && check_all_connected(pandas)) { can_recv(pandas, &pm); - can_send(pandas, fack_send); // Process peripheral state every 20 Hz if (rk.frame() % 5 == 0) { @@ -544,6 +548,8 @@ void pandad_run(std::vector &pandas) { if (g_safety_future.valid()) { g_safety_future.wait(); } + + send_thread.join(); } void pandad_main_thread(std::vector serials) {