Skip to content

Commit

Permalink
single thread
Browse files Browse the repository at this point in the history
improve comment
  • Loading branch information
deanlee committed Jun 30, 2024
1 parent cec8d5a commit 8a06e68
Showing 1 changed file with 83 additions and 93 deletions.
176 changes: 83 additions & 93 deletions selfdrive/pandad/pandad.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <future>
#include <memory>
#include <thread>
#include <utility>

#include "cereal/gen/cpp/car.capnp.h"
#include "cereal/messaging/messaging.h"
Expand Down Expand Up @@ -45,7 +46,8 @@
#define SATURATE_IL 1000
using namespace std::chrono_literals;

std::atomic<bool> ignition(false);
std::atomic<bool> g_ignition(false);
std::future<bool> g_safety_future;

ExitHandler do_exit;

Expand Down Expand Up @@ -77,7 +79,7 @@ bool safety_setter_thread(std::vector<Panda *> pandas) {

// openpilot can switch between multiplexing modes for different FW queries
while (true) {
if (do_exit || !check_all_connected(pandas) || !ignition) {
if (do_exit || !check_all_connected(pandas) || !g_ignition) {
return false;
}

Expand All @@ -101,7 +103,7 @@ bool safety_setter_thread(std::vector<Panda *> pandas) {
std::string params;
LOGW("waiting for params to set safety model");
while (true) {
if (do_exit || !check_all_connected(pandas) || !ignition) {
if (do_exit || !check_all_connected(pandas) || !g_ignition) {
return false;
}

Expand Down Expand Up @@ -162,25 +164,24 @@ Panda *connect(std::string serial="", uint32_t index=0) {
return panda.release();
}

void can_send_thread(std::vector<Panda *> pandas, bool fake_send) {
util::set_thread_name("pandad_can_send");
void can_send(std::vector<Panda *> &pandas, bool fake_send) {
static AlignedBuffer aligned_buf;
static std::unique_ptr<Context> context(Context::create());
static std::unique_ptr<SubSocket> subscriber(SubSocket::create(context.get(), "sendcan"));

AlignedBuffer aligned_buf;
std::unique_ptr<Context> context(Context::create());
std::unique_ptr<SubSocket> subscriber(SubSocket::create(context.get(), "sendcan"));
assert(subscriber != NULL);
subscriber->setTimeout(100);
std::vector<std::unique_ptr<Message>> messages;

// run as fast as messages come in
while (!do_exit && check_all_connected(pandas)) {
std::unique_ptr<Message> msg(subscriber->receive());
if (!msg) {
if (errno == EINTR) {
do_exit = true;
}
continue;
}
// Non-blocking drain of sendcan socket
while (auto msg = subscriber->receive(true)) {
messages.emplace_back(msg);
}

if (do_exit || !check_all_connected(pandas)) {
return;
}

// Process and send each message to pandas
for (const auto &msg : messages) {
capnp::FlatArrayMessageReader cmsg(aligned_buf.align(msg.get()));
cereal::Event::Reader event = cmsg.getRoot<cereal::Event>();

Expand All @@ -197,16 +198,9 @@ void can_send_thread(std::vector<Panda *> pandas, bool fake_send) {
}
}

void can_recv_thread(std::vector<Panda *> pandas) {
util::set_thread_name("pandad_can_recv");

PubMaster pm({"can"});

// run at 100Hz
RateKeeper rk("pandad_can_recv", 100);
std::vector<can_frame> raw_can_data;

while (!do_exit && check_all_connected(pandas)) {
void can_recv(std::vector<Panda *> &pandas, PubMaster *pm) {
static std::vector<can_frame> raw_can_data;
{
bool comms_healthy = true;
raw_can_data.clear();
for (const auto& panda : pandas) {
Expand All @@ -217,15 +211,13 @@ void can_recv_thread(std::vector<Panda *> pandas) {
auto evt = msg.initEvent();
evt.setValid(comms_healthy);
auto canData = evt.initCan(raw_can_data.size());
for (uint i = 0; i<raw_can_data.size(); i++) {
for (size_t i = 0; i < raw_can_data.size(); ++i) {
canData[i].setAddress(raw_can_data[i].address);
canData[i].setBusTime(raw_can_data[i].busTime);
canData[i].setDat(kj::arrayPtr((uint8_t*)raw_can_data[i].dat.data(), raw_can_data[i].dat.size()));
canData[i].setSrc(raw_can_data[i].src);
}
pm.send("can", msg);

rk.keepTime();
pm->send("can", msg);
}
}

Expand Down Expand Up @@ -404,46 +396,26 @@ void send_peripheral_state(PubMaster *pm, Panda *panda) {
pm->send("peripheralState", msg);
}

void panda_state_thread(std::vector<Panda *> pandas, bool spoofing_started) {
util::set_thread_name("pandad_panda_state");

Params params;
SubMaster sm({"controlsState"});
PubMaster pm({"pandaStates", "peripheralState"});

Panda *peripheral_panda = pandas[0];
bool is_onroad = false;
bool is_onroad_last = false;
std::future<bool> safety_future;
void process_panda_state(std::vector<Panda *> &pandas, PubMaster *pm, bool spoofing_started) {
static Params params;
static SubMaster sm({"controlsState"});
static bool is_onroad_last = false;

std::vector<std::string> connected_serials;
for (Panda *p : pandas) {
connected_serials.push_back(p->hw_serial());
}

LOGD("start panda state thread");

// run at 10hz
RateKeeper rk("panda_state_thread", 10);

while (!do_exit && check_all_connected(pandas)) {
// send out peripheralState at 2Hz
if (sm.frame % 5 == 0) {
send_peripheral_state(&pm, peripheral_panda);
}

auto ignition_opt = send_panda_states(&pm, pandas, spoofing_started);

{
auto ignition_opt = send_panda_states(pm, pandas, spoofing_started);
if (!ignition_opt) {
LOGE("Failed to get ignition_opt");
rk.keepTime();
continue;
return;
}

ignition = *ignition_opt;
g_ignition = *ignition_opt;

// check if we should have pandad reconnect
if (!ignition) {
if (!g_ignition) {
bool comms_healthy = true;
for (const auto &panda : pandas) {
comms_healthy &= panda->comms_healthy();
Expand All @@ -463,18 +435,14 @@ void panda_state_thread(std::vector<Panda *> pandas, bool spoofing_started) {
}
}
}

if (do_exit) {
break;
}
}

is_onroad = params.getBool("IsOnroad");
bool is_onroad = params.getBool("IsOnroad");

// set new safety on onroad transition, after params are cleared
if (is_onroad && !is_onroad_last) {
if (!safety_future.valid() || safety_future.wait_for(0ms) == std::future_status::ready) {
safety_future = std::async(std::launch::async, safety_setter_thread, pandas);
if (!g_safety_future.valid() || g_safety_future.wait_for(0ms) == std::future_status::ready) {
g_safety_future = std::async(std::launch::async, safety_setter_thread, pandas);
} else {
LOGW("Safety setter thread already running");
}
Expand All @@ -488,26 +456,28 @@ void panda_state_thread(std::vector<Panda *> pandas, bool spoofing_started) {
for (const auto &panda : pandas) {
panda->send_heartbeat(engaged);
}

rk.keepTime();
}
}


void peripheral_control_thread(Panda *panda, bool no_fan_control) {
util::set_thread_name("pandad_peripheral_control");
// run at 20 HZ
void process_peripheral_state(Panda *panda, PubMaster *pm, bool no_fan_control) {
static SubMaster sm({"deviceState", "driverCameraState"});

SubMaster sm({"deviceState", "driverCameraState"});
static uint64_t last_driver_camera_t = 0;
static uint16_t prev_fan_speed = 999;
static uint16_t ir_pwr = 0;
static uint16_t prev_ir_pwr = 999;

uint64_t last_driver_camera_t = 0;
uint16_t prev_fan_speed = 999;
uint16_t ir_pwr = 0;
uint16_t prev_ir_pwr = 999;
static FirstOrderFilter integ_lines_filter(0, 30.0, 0.05);

FirstOrderFilter integ_lines_filter(0, 30.0, 0.05);
{
sm.update(0);

while (!do_exit && panda->connected()) {
sm.update(1000);
// send out peripheralState at 2Hz
if (sm.frame % 10 == 0) {
send_peripheral_state(pm, panda);
}

if (sm.updated("deviceState") && !no_fan_control) {
// Fan speed
Expand Down Expand Up @@ -546,9 +516,38 @@ void peripheral_control_thread(Panda *panda, bool no_fan_control) {
}
}

void pandad_main_thread(std::vector<std::string> serials) {
LOGW("launching pandad");
void pandad_run(std::vector<Panda *> &pandas) {
const bool no_fan_control = getenv("NO_FAN_CONTROL") != nullptr;
const bool spoofing_started = getenv("STARTED") != nullptr;
const bool fack_send = getenv("FAKESEND") != nullptr;

PubMaster pm({"can", "pandaStates", "peripheralState"});
RateKeeper rk("pandad", 100); // 100 hz

while (!do_exit && check_all_connected(pandas)) {
can_recv(pandas, &pm);
can_send(pandas, fack_send);

// Process peripheral state every 20 Hz
if (rk.frame() % 5 == 0) {
process_peripheral_state(pandas[0], &pm, no_fan_control);
}

// Process panda state every 10 Hz
if (rk.frame() % 10 == 0) {
process_panda_state(pandas, &pm, spoofing_started);
}

rk.keepTime();
}

// Wait for safety future to finish if valid
if (g_safety_future.valid()) {
g_safety_future.wait();
}
}

void pandad_main_thread(std::vector<std::string> serials) {
if (serials.size() == 0) {
serials = Panda::list();

Expand Down Expand Up @@ -580,16 +579,7 @@ void pandad_main_thread(std::vector<std::string> serials) {

if (!do_exit) {
LOGW("connected to all pandas");

std::vector<std::thread> threads;

threads.emplace_back(panda_state_thread, pandas, getenv("STARTED") != nullptr);
threads.emplace_back(peripheral_control_thread, pandas[0], getenv("NO_FAN_CONTROL") != nullptr);

threads.emplace_back(can_send_thread, pandas, getenv("FAKESEND") != nullptr);
threads.emplace_back(can_recv_thread, pandas);

for (auto &t : threads) t.join();
pandad_run(pandas);
}

for (Panda *panda : pandas) {
Expand Down

0 comments on commit 8a06e68

Please sign in to comment.