From 008d5968391348658dab63f89472cc6b90c8c382 Mon Sep 17 00:00:00 2001 From: deanlee Date: Mon, 10 Jun 2024 19:51:29 +0800 Subject: [PATCH] single thread --- common/ratekeeper.h | 2 +- selfdrive/pandad/pandad.cc | 154 ++++++++++++++++++------------------- 2 files changed, 74 insertions(+), 82 deletions(-) diff --git a/common/ratekeeper.h b/common/ratekeeper.h index b6e8ac66a6b4ba..e7323c6ec3428e 100644 --- a/common/ratekeeper.h +++ b/common/ratekeeper.h @@ -9,7 +9,7 @@ class RateKeeper { ~RateKeeper() {} bool keepTime(); bool monitorTime(); - inline double frame() const { return frame_; } + inline uint64_t frame() const { return frame_; } inline double remaining() const { return remaining_; } private: diff --git a/selfdrive/pandad/pandad.cc b/selfdrive/pandad/pandad.cc index 095a7893b4665a..2a690b0c4149a4 100644 --- a/selfdrive/pandad/pandad.cc +++ b/selfdrive/pandad/pandad.cc @@ -45,7 +45,8 @@ #define SATURATE_IL 1000 using namespace std::chrono_literals; -std::atomic ignition(false); +std::atomic g_ignition(false); +std::future g_safety_future; ExitHandler do_exit; @@ -77,7 +78,7 @@ bool safety_setter_thread(std::vector pandas) { // openpilot can switch between multiplexing modes for different FW queries while (true) { - if (do_exit || !check_all_connected(pandas) || !ignition) { + if (do_exit || !check_all_connected(pandas) || !g_ignition) { return false; } @@ -101,7 +102,7 @@ bool safety_setter_thread(std::vector pandas) { std::string params; LOGW("waiting for params to set safety model"); while (true) { - if (do_exit || !check_all_connected(pandas) || !ignition) { + if (do_exit || !check_all_connected(pandas) || !g_ignition) { return false; } @@ -162,23 +163,21 @@ Panda *connect(std::string serial="", uint32_t index=0) { return panda.release(); } -void can_send_thread(std::vector pandas, bool fake_send) { - util::set_thread_name("pandad_can_send"); +void can_send(std::vector &pandas, bool fake_send) { + static AlignedBuffer aligned_buf; + static std::unique_ptr context(Context::create()); + static std::unique_ptr subscriber(SubSocket::create(context.get(), "sendcan")); - AlignedBuffer aligned_buf; - std::unique_ptr context(Context::create()); - std::unique_ptr subscriber(SubSocket::create(context.get(), "sendcan")); - assert(subscriber != NULL); - subscriber->setTimeout(100); + const bool non_blocking = true; // run as fast as messages come in while (!do_exit && check_all_connected(pandas)) { - std::unique_ptr msg(subscriber->receive()); + std::unique_ptr msg(subscriber->receive(non_blocking)); if (!msg) { if (errno == EINTR) { do_exit = true; } - continue; + break; } capnp::FlatArrayMessageReader cmsg(aligned_buf.align(msg.get())); @@ -197,16 +196,10 @@ void can_send_thread(std::vector pandas, bool fake_send) { } } -void can_recv_thread(std::vector pandas) { - util::set_thread_name("pandad_can_recv"); +void can_recv(std::vector &pandas, PubMaster *pm) { + static std::vector raw_can_data; - PubMaster pm({"can"}); - - // run at 100Hz - RateKeeper rk("pandad_can_recv", 100); - std::vector raw_can_data; - - while (!do_exit && check_all_connected(pandas)) { + { bool comms_healthy = true; raw_can_data.clear(); for (const auto& panda : pandas) { @@ -223,9 +216,7 @@ void can_recv_thread(std::vector pandas) { canData[i].setDat(kj::arrayPtr((uint8_t*)raw_can_data[i].dat.data(), raw_can_data[i].dat.size())); canData[i].setSrc(raw_can_data[i].src); } - pm.send("can", msg); - - rk.keepTime(); + pm->send("can", msg); } } @@ -404,46 +395,29 @@ void send_peripheral_state(PubMaster *pm, Panda *panda) { pm->send("peripheralState", msg); } -void panda_state_thread(std::vector pandas, bool spoofing_started) { - util::set_thread_name("pandad_panda_state"); - - Params params; - SubMaster sm({"controlsState"}); - PubMaster pm({"pandaStates", "peripheralState"}); +void process_panda_state(std::vector &pandas, PubMaster *pm, bool spoofing_started) { + static Params params; + static SubMaster sm({"controlsState"}); - Panda *peripheral_panda = pandas[0]; - bool is_onroad = false; - bool is_onroad_last = false; - std::future safety_future; + static bool is_onroad_last = false; std::vector connected_serials; for (Panda *p : pandas) { connected_serials.push_back(p->hw_serial()); } - LOGD("start panda state thread"); - - // run at 10hz - RateKeeper rk("panda_state_thread", 10); - - while (!do_exit && check_all_connected(pandas)) { - // send out peripheralState at 2Hz - if (sm.frame % 5 == 0) { - send_peripheral_state(&pm, peripheral_panda); - } - - auto ignition_opt = send_panda_states(&pm, pandas, spoofing_started); + { + auto ignition_opt = send_panda_states(pm, pandas, spoofing_started); if (!ignition_opt) { LOGE("Failed to get ignition_opt"); - rk.keepTime(); - continue; + return; } - ignition = *ignition_opt; + g_ignition = *ignition_opt; // check if we should have pandad reconnect - if (!ignition) { + if (!g_ignition) { bool comms_healthy = true; for (const auto &panda : pandas) { comms_healthy &= panda->comms_healthy(); @@ -463,18 +437,14 @@ void panda_state_thread(std::vector pandas, bool spoofing_started) { } } } - - if (do_exit) { - break; - } } - is_onroad = params.getBool("IsOnroad"); + bool is_onroad = params.getBool("IsOnroad"); // set new safety on onroad transition, after params are cleared if (is_onroad && !is_onroad_last) { - if (!safety_future.valid() || safety_future.wait_for(0ms) == std::future_status::ready) { - safety_future = std::async(std::launch::async, safety_setter_thread, pandas); + if (!g_safety_future.valid() || g_safety_future.wait_for(0ms) == std::future_status::ready) { + g_safety_future = std::async(std::launch::async, safety_setter_thread, pandas); } else { LOGW("Safety setter thread already running"); } @@ -488,26 +458,28 @@ void panda_state_thread(std::vector pandas, bool spoofing_started) { for (const auto &panda : pandas) { panda->send_heartbeat(engaged); } - - rk.keepTime(); } } -void peripheral_control_thread(Panda *panda, bool no_fan_control) { - util::set_thread_name("pandad_peripheral_control"); +// run at 20 HZ +void process_peripheral_state(Panda *panda, PubMaster *pm, bool no_fan_control) { + static SubMaster sm({"deviceState", "driverCameraState"}); - SubMaster sm({"deviceState", "driverCameraState"}); + static uint64_t last_driver_camera_t = 0; + static uint16_t prev_fan_speed = 999; + static uint16_t ir_pwr = 0; + static uint16_t prev_ir_pwr = 999; - uint64_t last_driver_camera_t = 0; - uint16_t prev_fan_speed = 999; - uint16_t ir_pwr = 0; - uint16_t prev_ir_pwr = 999; + static FirstOrderFilter integ_lines_filter(0, 30.0, 0.05); - FirstOrderFilter integ_lines_filter(0, 30.0, 0.05); + { + sm.update(0); - while (!do_exit && panda->connected()) { - sm.update(1000); + if (sm.frame % 10 == 0) { + // send out peripheralState at 2Hz + send_peripheral_state(pm, panda); + } if (sm.updated("deviceState") && !no_fan_control) { // Fan speed @@ -546,9 +518,38 @@ void peripheral_control_thread(Panda *panda, bool no_fan_control) { } } -void pandad_main_thread(std::vector serials) { - LOGW("launching pandad"); +void pandad_run(std::vector &pandas) { + const bool no_fan_control = getenv("NO_FAN_CONTROL") != nullptr; + const bool spoofing_started = getenv("STARTED") != nullptr; + const bool fack_send = getenv("FAKESEND") != nullptr; + + PubMaster pm({"can", "pandaStates", "peripheralState"}); + RateKeeper rk("pandad", 100); // 100 hz + + while (!do_exit && check_all_connected(pandas)) { + can_recv(pandas, &pm); + can_send(pandas, fack_send); + + // Process peripheral state every 20 Hz + if (rk.frame() % 5 == 0) { + process_peripheral_state(pandas[0], &pm, no_fan_control); + } + + // Process panda state every 10 Hz + if (rk.frame() % 10 == 0) { + process_panda_state(pandas, &pm, spoofing_started); + } + + rk.keepTime(); + } + + // Wait for safety future to finish if valid + if (g_safety_future.valid()) { + g_safety_future.wait(); + } +} +void pandad_main_thread(std::vector serials) { if (serials.size() == 0) { serials = Panda::list(); @@ -580,16 +581,7 @@ void pandad_main_thread(std::vector serials) { if (!do_exit) { LOGW("connected to all pandas"); - - std::vector threads; - - threads.emplace_back(panda_state_thread, pandas, getenv("STARTED") != nullptr); - threads.emplace_back(peripheral_control_thread, pandas[0], getenv("NO_FAN_CONTROL") != nullptr); - - threads.emplace_back(can_send_thread, pandas, getenv("FAKESEND") != nullptr); - threads.emplace_back(can_recv_thread, pandas); - - for (auto &t : threads) t.join(); + pandad_run(pandas); } for (Panda *panda : pandas) {