Skip to content

Commit

Permalink
refactor CanParser
Browse files Browse the repository at this point in the history
  • Loading branch information
deanlee committed Jun 8, 2024
1 parent b4842fd commit 47f639b
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 124 deletions.
12 changes: 6 additions & 6 deletions can/common.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <map>
#include <set>
#include <string>
#include <utility>
#include <unordered_map>
Expand Down Expand Up @@ -43,8 +44,7 @@ class MessageState {
unsigned int size;

std::vector<Signal> parse_sigs;
std::vector<double> vals;
std::vector<std::vector<double>> all_vals;
std::map<std::string, SignalValue> values;

uint64_t last_seen_nanos;
uint64_t check_threshold;
Expand All @@ -66,6 +66,7 @@ class CANParser {

const DBC *dbc = NULL;
std::unordered_map<uint32_t, MessageState> message_states;
kj::ArrayPtr<capnp::word> getAlignedData(const std::string &data);

public:
bool can_valid = false;
Expand All @@ -80,13 +81,12 @@ class CANParser {
const std::vector<std::pair<uint32_t, int>> &messages);
CANParser(int abus, const std::string& dbc_name, bool ignore_checksum, bool ignore_counter);
#ifndef DYNAMIC_CAPNP
void update_string(const std::string &data, bool sendcan);
void update_strings(const std::vector<std::string> &data, std::vector<SignalValue> &vals, bool sendcan);
void UpdateCans(uint64_t nanos, const capnp::List<cereal::CanData>::Reader& cans);
MessageState *messageState(uint32_t address) { return &message_states.at(address); }
std::set<uint32_t> update_strings(const std::vector<std::string> &data, bool sendcan);
void UpdateCans(uint64_t nanos, const capnp::List<cereal::CanData>::Reader& cans, std::set<uint32_t> updated_addresses);
#endif
void UpdateCans(uint64_t nanos, const capnp::DynamicStruct::Reader& cans);
void UpdateValid(uint64_t nanos);
void query_latest(std::vector<SignalValue> &vals, uint64_t last_ts = 0);
};

class CANPacker {
Expand Down
10 changes: 7 additions & 3 deletions can/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

from libc.stdint cimport uint8_t, uint16_t, uint32_t, uint64_t
from libcpp cimport bool
from libcpp.map cimport map
from libcpp.pair cimport pair
from libcpp.set cimport set
from libcpp.string cimport string
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map
Expand Down Expand Up @@ -53,9 +55,7 @@ cdef extern from "common_dbc.h":
unordered_map[string, const Msg*] name_to_msg

cdef struct SignalValue:
uint32_t address
uint64_t ts_nanos
string name
double value
vector[double] all_values

Expand All @@ -67,11 +67,15 @@ cdef extern from "common_dbc.h":
cdef extern from "common.h":
cdef const DBC* dbc_lookup(const string) except +

cdef cppclass MessageState:
map[string, SignalValue] values

cdef cppclass CANParser:
bool can_valid
bool bus_timeout
CANParser(int, string, vector[pair[uint32_t, int]]) except +
void update_strings(vector[string]&, vector[SignalValue]&, bool) except +
MessageState *messageState(uint32_t address)
set[uint32_t] update_strings(vector[string]&, bool) except +

cdef cppclass CANPacker:
CANPacker(string)
Expand Down
2 changes: 0 additions & 2 deletions can/common_dbc.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ struct SignalPackValue {
};

struct SignalValue {
uint32_t address;
uint64_t ts_nanos;
std::string name;
double value; // latest value
std::vector<double> all_values; // all values from this cycle
};
Expand Down
106 changes: 39 additions & 67 deletions can/parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@
#include <stdexcept>
#include <sstream>

#include <unistd.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/mman.h>

#include "opendbc/can/common.h"

int64_t get_raw_value(const std::vector<uint8_t> &msg, const Signal &sig) {
Expand All @@ -31,13 +26,12 @@ int64_t get_raw_value(const std::vector<uint8_t> &msg, const Signal &sig) {
return ret;
}


bool MessageState::parse(uint64_t nanos, const std::vector<uint8_t> &dat) {
std::vector<double> tmp_vals(parse_sigs.size());
bool checksum_failed = false;
bool counter_failed = false;

for (int i = 0; i < parse_sigs.size(); i++) {
for (int i = 0; i < parse_sigs.size(); ++i) {
const auto &sig = parse_sigs[i];

int64_t tmp = get_raw_value(dat, sig);
Expand Down Expand Up @@ -68,9 +62,11 @@ bool MessageState::parse(uint64_t nanos, const std::vector<uint8_t> &dat) {
return false;
}

for (int i = 0; i < parse_sigs.size(); i++) {
vals[i] = tmp_vals[i];
all_vals[i].push_back(vals[i]);
for (int i = 0; i < parse_sigs.size(); ++i) {
auto &val = values[parse_sigs[i].name];
val.value = tmp_vals[i];
val.ts_nanos = nanos;
val.all_values.push_back(val.value);
}
last_seen_nanos = nanos;

Expand Down Expand Up @@ -127,8 +123,9 @@ CANParser::CANParser(int abus, const std::string& dbc_name, const std::vector<st

// track all signals for this message
state.parse_sigs = msg->sigs;
state.vals.resize(msg->sigs.size());
state.all_vals.resize(msg->sigs.size());
for (auto &sig : msg->sigs) {
state.values[sig.name] = {};
}
}
}

Expand All @@ -149,54 +146,51 @@ CANParser::CANParser(int abus, const std::string& dbc_name, bool ignore_checksum
.ignore_counter = ignore_counter,
};

for (const auto& sig : msg.sigs) {
state.parse_sigs.push_back(sig);
state.vals.push_back(0);
state.all_vals.push_back({});
}

state.parse_sigs = msg.sigs;
message_states[state.address] = state;
// Initialize value entries for each signal in the message
for (auto &sig : msg.sigs) {
message_states[state.address].values[sig.name] = {};
}
}
}

#ifndef DYNAMIC_CAPNP
void CANParser::update_string(const std::string &data, bool sendcan) {
// format for board, make copy due to alignment issues.
kj::ArrayPtr<capnp::word> CANParser::getAlignedData(const std::string &data) {
const size_t buf_size = (data.length() / sizeof(capnp::word)) + 1;
if (aligned_buf.size() < buf_size) {
aligned_buf = kj::heapArray<capnp::word>(buf_size);
}
memcpy(aligned_buf.begin(), data.data(), data.length());
return aligned_buf.slice(0, buf_size);
}

// extract the messages
capnp::FlatArrayMessageReader cmsg(aligned_buf.slice(0, buf_size));
cereal::Event::Reader event = cmsg.getRoot<cereal::Event>();

if (first_nanos == 0) {
first_nanos = event.getLogMonoTime();
std::set<uint32_t> CANParser::update_strings(const std::vector<std::string> &data, bool sendcan) {
// Clear all_values
for (auto &state : message_states) {
for (auto &value : state.second.values) {
value.second.all_values.clear();
}
}
last_nanos = event.getLogMonoTime();

auto cans = sendcan ? event.getSendcan() : event.getCan();
UpdateCans(last_nanos, cans);

UpdateValid(last_nanos);
}

void CANParser::update_strings(const std::vector<std::string> &data, std::vector<SignalValue> &vals, bool sendcan) {
uint64_t current_nanos = 0;
std::set<uint32_t> updated_addresses;
for (const auto &d : data) {
update_string(d, sendcan);
if (current_nanos == 0) {
current_nanos = last_nanos;
capnp::FlatArrayMessageReader cmsg(getAlignedData(d));
cereal::Event::Reader event = cmsg.getRoot<cereal::Event>();

if (first_nanos == 0) {
first_nanos = event.getLogMonoTime();
}
last_nanos = event.getLogMonoTime();

auto cans = sendcan ? event.getSendcan() : event.getCan();
UpdateCans(last_nanos, cans, updated_addresses);
UpdateValid(last_nanos);
}
query_latest(vals, current_nanos);
return updated_addresses;
}

void CANParser::UpdateCans(uint64_t nanos, const capnp::List<cereal::CanData>::Reader& cans) {
//DEBUG("got %d messages\n", cans.size());

void CANParser::UpdateCans(uint64_t nanos, const capnp::List<cereal::CanData>::Reader& cans, std::set<uint32_t> updated_addresses) {
bool bus_empty = true;

// parse the messages
Expand All @@ -214,7 +208,6 @@ void CANParser::UpdateCans(uint64_t nanos, const capnp::List<cereal::CanData>::R
}

auto dat = cmsg.getDat();

if (dat.size() > 64) {
DEBUG("got message longer than 64 bytes: 0x%X %zu\n", cmsg.getAddress(), dat.size());
continue;
Expand All @@ -231,7 +224,9 @@ void CANParser::UpdateCans(uint64_t nanos, const capnp::List<cereal::CanData>::R
size_t data_size = std::max<size_t>(dat.size(), state_it->second.size);
std::vector<uint8_t> data(data_size, 0);
memcpy(data.data(), dat.begin(), dat.size());
state_it->second.parse(nanos, data);
if (state_it->second.parse(nanos, data)) {
updated_addresses.insert(state_it->first);
}
}

// update bus timeout
Expand Down Expand Up @@ -292,26 +287,3 @@ void CANParser::UpdateValid(uint64_t nanos) {
can_invalid_cnt = _valid ? 0 : (can_invalid_cnt + 1);
can_valid = (can_invalid_cnt < CAN_INVALID_CNT) && _counters_valid;
}

void CANParser::query_latest(std::vector<SignalValue> &vals, uint64_t last_ts) {
if (last_ts == 0) {
last_ts = last_nanos;
}
for (auto& kv : message_states) {
auto& state = kv.second;
if (last_ts != 0 && state.last_seen_nanos < last_ts) {
continue;
}

for (int i = 0; i < state.parse_sigs.size(); i++) {
const Signal &sig = state.parse_sigs[i];
SignalValue &v = vals.emplace_back();
v.address = state.address;
v.ts_nanos = state.last_seen_nanos;
v.name = sig.name;
v.value = state.vals[i];
v.all_values = state.all_vals[i];
state.all_vals[i].clear();
}
}
}
Loading

0 comments on commit 47f639b

Please sign in to comment.