Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pandad: refactor to consolidate threads, keep only one can_send thread #32680

Merged
merged 4 commits into from
Aug 7, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 76 additions & 80 deletions selfdrive/pandad/pandad.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <future>
#include <memory>
#include <thread>
#include <utility>

#include "cereal/gen/cpp/car.capnp.h"
#include "cereal/messaging/messaging.h"
Expand Down Expand Up @@ -45,7 +46,8 @@
#define SATURATE_IL 1000
using namespace std::chrono_literals;

std::atomic<bool> ignition(false);
std::atomic<bool> g_ignition(false);
std::future<bool> g_safety_future;

ExitHandler do_exit;

Expand Down Expand Up @@ -77,7 +79,7 @@ bool safety_setter_thread(std::vector<Panda *> pandas) {

// openpilot can switch between multiplexing modes for different FW queries
while (true) {
if (do_exit || !check_all_connected(pandas) || !ignition) {
if (do_exit || !check_all_connected(pandas) || !g_ignition) {
return false;
}

Expand All @@ -101,7 +103,7 @@ bool safety_setter_thread(std::vector<Panda *> pandas) {
std::string params;
LOGW("waiting for params to set safety model");
while (true) {
if (do_exit || !check_all_connected(pandas) || !ignition) {
if (do_exit || !check_all_connected(pandas) || !g_ignition) {
return false;
}

Expand Down Expand Up @@ -197,16 +199,9 @@ void can_send_thread(std::vector<Panda *> pandas, bool fake_send) {
}
}

void can_recv_thread(std::vector<Panda *> pandas) {
util::set_thread_name("pandad_can_recv");

PubMaster pm({"can"});

// run at 100Hz
RateKeeper rk("pandad_can_recv", 100);
std::vector<can_frame> raw_can_data;

while (!do_exit && check_all_connected(pandas)) {
void can_recv(std::vector<Panda *> &pandas, PubMaster *pm) {
static std::vector<can_frame> raw_can_data;
{
bool comms_healthy = true;
raw_can_data.clear();
for (const auto& panda : pandas) {
Expand All @@ -217,14 +212,12 @@ void can_recv_thread(std::vector<Panda *> pandas) {
auto evt = msg.initEvent();
evt.setValid(comms_healthy);
auto canData = evt.initCan(raw_can_data.size());
for (uint i = 0; i<raw_can_data.size(); i++) {
for (size_t i = 0; i < raw_can_data.size(); ++i) {
canData[i].setAddress(raw_can_data[i].address);
canData[i].setDat(kj::arrayPtr((uint8_t*)raw_can_data[i].dat.data(), raw_can_data[i].dat.size()));
canData[i].setSrc(raw_can_data[i].src);
}
pm.send("can", msg);

rk.keepTime();
pm->send("can", msg);
}
}

Expand Down Expand Up @@ -380,7 +373,7 @@ std::optional<bool> send_panda_states(PubMaster *pm, const std::vector<Panda *>
return ignition_local;
}

void send_peripheral_state(PubMaster *pm, Panda *panda) {
void send_peripheral_state(Panda *panda, PubMaster *pm) {
// build msg
MessageBuilder msg;
auto evt = msg.initEvent();
Expand All @@ -403,46 +396,26 @@ void send_peripheral_state(PubMaster *pm, Panda *panda) {
pm->send("peripheralState", msg);
}

void panda_state_thread(std::vector<Panda *> pandas, bool spoofing_started) {
util::set_thread_name("pandad_panda_state");

Params params;
SubMaster sm({"controlsState"});
PubMaster pm({"pandaStates", "peripheralState"});

Panda *peripheral_panda = pandas[0];
bool is_onroad = false;
bool is_onroad_last = false;
std::future<bool> safety_future;
void process_panda_state(std::vector<Panda *> &pandas, PubMaster *pm, bool spoofing_started) {
static Params params;
static SubMaster sm({"controlsState"});
static bool is_onroad_last = false;

std::vector<std::string> connected_serials;
for (Panda *p : pandas) {
connected_serials.push_back(p->hw_serial());
}

LOGD("start panda state thread");

// run at 10hz
RateKeeper rk("panda_state_thread", 10);

while (!do_exit && check_all_connected(pandas)) {
// send out peripheralState at 2Hz
if (sm.frame % 5 == 0) {
send_peripheral_state(&pm, peripheral_panda);
}

auto ignition_opt = send_panda_states(&pm, pandas, spoofing_started);

{
auto ignition_opt = send_panda_states(pm, pandas, spoofing_started);
if (!ignition_opt) {
LOGE("Failed to get ignition_opt");
rk.keepTime();
continue;
return;
}

ignition = *ignition_opt;
g_ignition = *ignition_opt;

// check if we should have pandad reconnect
if (!ignition) {
if (!g_ignition) {
bool comms_healthy = true;
for (const auto &panda : pandas) {
comms_healthy &= panda->comms_healthy();
Expand All @@ -462,18 +435,14 @@ void panda_state_thread(std::vector<Panda *> pandas, bool spoofing_started) {
}
}
}

if (do_exit) {
break;
}
}

is_onroad = params.getBool("IsOnroad");
bool is_onroad = params.getBool("IsOnroad");

// set new safety on onroad transition, after params are cleared
if (is_onroad && !is_onroad_last) {
if (!safety_future.valid() || safety_future.wait_for(0ms) == std::future_status::ready) {
safety_future = std::async(std::launch::async, safety_setter_thread, pandas);
if (!g_safety_future.valid() || g_safety_future.wait_for(0ms) == std::future_status::ready) {
g_safety_future = std::async(std::launch::async, safety_setter_thread, pandas);
} else {
LOGW("Safety setter thread already running");
}
Expand All @@ -487,27 +456,23 @@ void panda_state_thread(std::vector<Panda *> pandas, bool spoofing_started) {
for (const auto &panda : pandas) {
panda->send_heartbeat(engaged);
}

rk.keepTime();
}
}


void peripheral_control_thread(Panda *panda, bool no_fan_control) {
util::set_thread_name("pandad_peripheral_control");
// run at 20 HZ
void process_peripheral_state(Panda *panda, PubMaster *pm, bool no_fan_control) {
static SubMaster sm({"deviceState", "driverCameraState"});

SubMaster sm({"deviceState", "driverCameraState"});
static uint64_t last_driver_camera_t = 0;
static uint16_t prev_fan_speed = 999;
static uint16_t ir_pwr = 0;
static uint16_t prev_ir_pwr = 999;

uint64_t last_driver_camera_t = 0;
uint16_t prev_fan_speed = 999;
uint16_t ir_pwr = 0;
uint16_t prev_ir_pwr = 999;

FirstOrderFilter integ_lines_filter(0, 30.0, 0.05);

while (!do_exit && panda->connected()) {
sm.update(1000);
static FirstOrderFilter integ_lines_filter(0, 30.0, 0.05);

{
sm.update(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sm.update should probably happen in the main pandad_run loop for readability

Copy link
Contributor Author

@deanlee deanlee Aug 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two submasters updated at different frequencies. For now, keeping updates in their own functions provides better encapsulation. Moving to pandad_run would add complexity. We can address this in a future refactor or another PR.

if (sm.updated("deviceState") && !no_fan_control) {
// Fan speed
uint16_t fan_speed = sm["deviceState"].getDeviceState().getFanSpeedPercentDesired();
Expand Down Expand Up @@ -545,9 +510,49 @@ void peripheral_control_thread(Panda *panda, bool no_fan_control) {
}
}

void pandad_main_thread(std::vector<std::string> serials) {
LOGW("launching pandad");
void pandad_run(std::vector<Panda *> &pandas) {
const bool no_fan_control = getenv("NO_FAN_CONTROL") != nullptr;
const bool spoofing_started = getenv("STARTED") != nullptr;
const bool fake_send = getenv("FAKESEND") != nullptr;

// Start the CAN send thread
std::thread send_thread(can_send_thread, pandas, fake_send);

PubMaster pm({"can", "pandaStates", "peripheralState"});
RateKeeper rk("pandad", 100); // 100 hz
deanlee marked this conversation as resolved.
Show resolved Hide resolved
Panda *peripheral_panda = pandas[0];

// Main loop: receive CAN data and process states
while (!do_exit && check_all_connected(pandas)) {
can_recv(pandas, &pm);

// Process peripheral state at 20 Hz
if (rk.frame() % 5 == 0) {
process_peripheral_state(peripheral_panda, &pm, no_fan_control);
}

// Process panda state at 10 Hz
if (rk.frame() % 10 == 0) {
process_panda_state(pandas, &pm, spoofing_started);
}

// Send out peripheralState at 2Hz
if (rk.frame() % 50 == 0) {
send_peripheral_state(peripheral_panda, &pm);
}

rk.keepTime();
}

// Wait for safety future to finish if valid
deanlee marked this conversation as resolved.
Show resolved Hide resolved
if (g_safety_future.valid()) {
g_safety_future.wait();
}

send_thread.join();
}

void pandad_main_thread(std::vector<std::string> serials) {
if (serials.size() == 0) {
serials = Panda::list();

Expand Down Expand Up @@ -579,16 +584,7 @@ void pandad_main_thread(std::vector<std::string> serials) {

if (!do_exit) {
LOGW("connected to all pandas");

std::vector<std::thread> threads;

threads.emplace_back(panda_state_thread, pandas, getenv("STARTED") != nullptr);
threads.emplace_back(peripheral_control_thread, pandas[0], getenv("NO_FAN_CONTROL") != nullptr);

threads.emplace_back(can_send_thread, pandas, getenv("FAKESEND") != nullptr);
threads.emplace_back(can_recv_thread, pandas);

for (auto &t : threads) t.join();
pandad_run(pandas);
}

for (Panda *panda : pandas) {
Expand Down
Loading