Skip to content

Commit

Permalink
refactor to improve performance
Browse files Browse the repository at this point in the history
  • Loading branch information
deanlee committed Jun 6, 2024
1 parent f58bc33 commit 09a5404
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 98 deletions.
9 changes: 5 additions & 4 deletions can/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ class MessageState {
unsigned int size;

std::vector<Signal> parse_sigs;
std::vector<double> vals;
std::vector<std::vector<double>> all_vals;
std::map<std::string, SignalValue> values;

uint64_t last_seen_nanos;
uint64_t check_threshold;
Expand All @@ -66,6 +65,7 @@ class CANParser {

const DBC *dbc = NULL;
std::unordered_map<uint32_t, MessageState> message_states;
kj::ArrayPtr<capnp::word> getAlignedData(const std::string &data);

public:
bool can_valid = false;
Expand All @@ -79,14 +79,15 @@ class CANParser {
CANParser(int abus, const std::string& dbc_name,
const std::vector<std::pair<uint32_t, int>> &messages);
CANParser(int abus, const std::string& dbc_name, bool ignore_checksum, bool ignore_counter);
SignalValue &getValue(uint32_t address, std::string &name);
#ifndef DYNAMIC_CAPNP
void update_string(const std::string &data, bool sendcan);
void update_strings(const std::vector<std::string> &data, std::vector<SignalValue> &vals, bool sendcan);
std::vector<uint32_t> update_strings(const std::vector<std::string> &data, bool sendcan);
void UpdateCans(uint64_t nanos, const capnp::List<cereal::CanData>::Reader& cans);
#endif
void UpdateCans(uint64_t nanos, const capnp::DynamicStruct::Reader& cans);
void UpdateValid(uint64_t nanos);
void query_latest(std::vector<SignalValue> &vals, uint64_t last_ts = 0);
std::vector<uint32_t> query_latest(uint64_t last_ts = 0);
};

class CANPacker {
Expand Down
5 changes: 2 additions & 3 deletions can/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ cdef extern from "common_dbc.h":
vector[Val] vals

cdef struct SignalValue:
uint32_t address
uint64_t ts_nanos
string name
double value
vector[double] all_values

Expand All @@ -68,7 +66,8 @@ cdef extern from "common.h":
bool can_valid
bool bus_timeout
CANParser(int, string, vector[pair[uint32_t, int]]) except +
void update_strings(vector[string]&, vector[SignalValue]&, bool) except +
SignalValue &getValue(uint32_t, string&)
vector[uint32_t] update_strings(vector[string]&, bool) except +

cdef cppclass CANPacker:
CANPacker(string)
Expand Down
2 changes: 0 additions & 2 deletions can/common_dbc.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ struct SignalPackValue {
};

struct SignalValue {
uint32_t address;
uint64_t ts_nanos;
std::string name;
double value; // latest value
std::vector<double> all_values; // all values from this cycle
};
Expand Down
102 changes: 59 additions & 43 deletions can/parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ bool MessageState::parse(uint64_t nanos, const std::vector<uint8_t> &dat) {
bool checksum_failed = false;
bool counter_failed = false;

for (int i = 0; i < parse_sigs.size(); i++) {
for (int i = 0; i < parse_sigs.size(); ++i) {
const auto &sig = parse_sigs[i];

int64_t tmp = get_raw_value(dat, sig);
Expand Down Expand Up @@ -68,9 +68,14 @@ bool MessageState::parse(uint64_t nanos, const std::vector<uint8_t> &dat) {
return false;
}

for (int i = 0; i < parse_sigs.size(); i++) {
vals[i] = tmp_vals[i];
all_vals[i].push_back(vals[i]);
// Update values for each signal
for (int i = 0; i < parse_sigs.size(); ++i) {
// Retrieve the value entry for the current signal
auto &val = values[parse_sigs[i].name];

val.value = tmp_vals[i];
val.ts_nanos = nanos;
val.all_values.push_back(val.value);
}
last_seen_nanos = nanos;

Expand Down Expand Up @@ -138,8 +143,9 @@ CANParser::CANParser(int abus, const std::string& dbc_name, const std::vector<st

// track all signals for this message
state.parse_sigs = msg->sigs;
state.vals.resize(msg->sigs.size());
state.all_vals.resize(msg->sigs.size());
for (auto &sig : msg->sigs) {
state.values[sig.name] = {};
}
}
}

Expand All @@ -160,27 +166,36 @@ CANParser::CANParser(int abus, const std::string& dbc_name, bool ignore_checksum
.ignore_counter = ignore_counter,
};

for (const auto& sig : msg.sigs) {
state.parse_sigs.push_back(sig);
state.vals.push_back(0);
state.all_vals.push_back({});
}

state.parse_sigs = msg.sigs;
message_states[state.address] = state;
// Initialize value entries for each signal in the message
for (auto &sig : msg.sigs) {
message_states[state.address].values[sig.name] = {};
}
}
}

#ifndef DYNAMIC_CAPNP
void CANParser::update_string(const std::string &data, bool sendcan) {
// format for board, make copy due to alignment issues.

// If the input data is already aligned to capnp::word boundaries,
// a direct ArrayPtr is returned. Otherwise, a new ArrayPtr is created
// with the data copied into it, ensuring alignment.
kj::ArrayPtr<capnp::word> CANParser::getAlignedData(const std::string &data) {
bool aligned = reinterpret_cast<uintptr_t>(data.data()) % sizeof(capnp::word) == 0;
if (aligned) {
return kj::ArrayPtr<capnp::word>((capnp::word*)(data.data()), data.size() / sizeof(capnp::word));
}

const size_t buf_size = (data.length() / sizeof(capnp::word)) + 1;
if (aligned_buf.size() < buf_size) {
aligned_buf = kj::heapArray<capnp::word>(buf_size);
}
memcpy(aligned_buf.begin(), data.data(), data.length());
return aligned_buf.slice(0, buf_size);
}

// extract the messages
capnp::FlatArrayMessageReader cmsg(aligned_buf.slice(0, buf_size));
void CANParser::update_string(const std::string &data, bool sendcan) {
capnp::FlatArrayMessageReader cmsg(getAlignedData(data));
cereal::Event::Reader event = cmsg.getRoot<cereal::Event>();

if (first_nanos == 0) {
Expand All @@ -194,24 +209,35 @@ void CANParser::update_string(const std::string &data, bool sendcan) {
UpdateValid(last_nanos);
}

void CANParser::update_strings(const std::vector<std::string> &data, std::vector<SignalValue> &vals, bool sendcan) {
std::vector<uint32_t> CANParser::update_strings(const std::vector<std::string> &data, bool sendcan) {
// Clear all_values
for (auto &state : message_states) {
for (auto &value : state.second.values) {
value.second.all_values.clear();
}
}

if (data.empty()) {
return {};
}

uint64_t current_nanos = 0;
for (const auto &d : data) {
update_string(d, sendcan);
if (current_nanos == 0) {
current_nanos = last_nanos;
}
}
query_latest(vals, current_nanos);
return query_latest(current_nanos);
}

void CANParser::UpdateCans(uint64_t nanos, const capnp::List<cereal::CanData>::Reader& cans) {
//DEBUG("got %d messages\n", cans.size());

std::vector<uint8_t> data;
bool bus_empty = true;

// parse the messages
for (const auto cmsg : cans) {
for (const auto &cmsg : cans) {
if (cmsg.getSrc() != bus) {
// DEBUG("skip %d: wrong bus\n", cmsg.getAddress());
continue;
Expand All @@ -225,7 +251,6 @@ void CANParser::UpdateCans(uint64_t nanos, const capnp::List<cereal::CanData>::R
}

auto dat = cmsg.getDat();

if (dat.size() > 64) {
DEBUG("got message longer than 64 bytes: 0x%X %zu\n", cmsg.getAddress(), dat.size());
continue;
Expand All @@ -236,9 +261,7 @@ void CANParser::UpdateCans(uint64_t nanos, const capnp::List<cereal::CanData>::R
// DEBUG("got message with unexpected length: expected %d, got %zu for %d", state_it->second.size, dat.size(), cmsg.getAddress());
// continue;
//}

std::vector<uint8_t> data(dat.size(), 0);
memcpy(data.data(), dat.begin(), dat.size());
data.assign(dat.begin(), dat.end());
state_it->second.parse(nanos, data);
}

Expand Down Expand Up @@ -301,25 +324,18 @@ void CANParser::UpdateValid(uint64_t nanos) {
can_valid = (can_invalid_cnt < CAN_INVALID_CNT) && _counters_valid;
}

void CANParser::query_latest(std::vector<SignalValue> &vals, uint64_t last_ts) {
if (last_ts == 0) {
last_ts = last_nanos;
}
for (auto& kv : message_states) {
auto& state = kv.second;
if (last_ts != 0 && state.last_seen_nanos < last_ts) {
continue;
}

for (int i = 0; i < state.parse_sigs.size(); i++) {
const Signal &sig = state.parse_sigs[i];
SignalValue &v = vals.emplace_back();
v.address = state.address;
v.ts_nanos = state.last_seen_nanos;
v.name = sig.name;
v.value = state.vals[i];
v.all_values = state.all_vals[i];
state.all_vals[i].clear();
// Retrieve addresses of messages seen since the last_ts timestamp.
std::vector<uint32_t> CANParser::query_latest(uint64_t last_ts) {
std::vector<uint32_t> result;
result.reserve(message_states.size());
for (const auto& [addr, state] : message_states) {
if (state.last_seen_nanos >= last_ts) {
result.push_back(addr);
}
}
return result;
}

SignalValue &CANParser::getValue(uint32_t address, std::string &name) {
return message_states.at(address).values.at(name);
}
78 changes: 32 additions & 46 deletions can/parser_pyx.pyx
Original file line number Diff line number Diff line change
@@ -1,25 +1,39 @@
# distutils: language = c++
# cython: c_string_encoding=ascii, language_level=3

from cython.operator cimport dereference as deref, preincrement as preinc
from libcpp.pair cimport pair
from libcpp.map cimport map
from libcpp.string cimport string
from libcpp.vector cimport vector
from libc.stdint cimport uint32_t

from .common cimport CANParser as cpp_CANParser
from .common cimport dbc_lookup, SignalValue, DBC
from .common cimport dbc_lookup, DBC, Msg

import numbers
from collections import defaultdict


class ValueDict(dict):
def __init__(self, address, func):
super().__init__()
self.address = address
self.func = func

def __getitem__(self, key):
return self.func(self.address, key)

def values(self):
return [self[key] for key in self]

def items(self):
return [(key, self[key]) for key in self]


cdef class CANParser:
cdef:
cpp_CANParser *can
const DBC *dbc
vector[SignalValue] can_values
vector[uint32_t] addresses

cdef readonly:
dict vl
Expand All @@ -37,73 +51,45 @@ cdef class CANParser:
self.vl_all = {}
self.ts_nanos = {}
msg_name_to_address = {}
address_to_msg_name = {}
cdef map[uint32_t, Msg] address_to_msg

for i in range(self.dbc[0].msgs.size()):
msg = self.dbc[0].msgs[i]
name = msg.name.decode("utf8")

msg_name_to_address[name] = msg.address
address_to_msg_name[msg.address] = name
address_to_msg[msg.address] = msg

# Convert message names into addresses and check existence in DBC
cdef vector[pair[uint32_t, int]] message_v
for i in range(len(messages)):
c = messages[i]
address = c[0] if isinstance(c[0], numbers.Number) else msg_name_to_address.get(c[0])
if address not in address_to_msg_name:
if address is None or address_to_msg.count(address) == 0:
raise RuntimeError(f"could not find message {repr(c[0])} in DBC {self.dbc_name}")
message_v.push_back((address, c[1]))
self.addresses.push_back(address)

name = address_to_msg_name[address]
self.vl[address] = {}
msg = address_to_msg[address]
name = msg.name.decode("utf8")
self.vl[address] = ValueDict(address, lambda addr, name: self.can.getValue(addr, name).value)
self.vl[name] = self.vl[address]
self.vl_all[address] = defaultdict(list)
self.vl_all[address] = ValueDict(address, lambda addr, name: self.can.getValue(addr, name).all_values)
self.vl_all[name] = self.vl_all[address]
self.ts_nanos[address] = {}
self.ts_nanos[address] = ValueDict(address, lambda addr, name: self.can.getValue(addr, name).ts_nanos)
self.ts_nanos[name] = self.ts_nanos[address]
for sig in msg.sigs:
name = sig.name.decode("utf8")
self.vl[address][name] = 0
self.vl_all[address][name] = []
self.ts_nanos[address][name] = 0

self.can = new cpp_CANParser(bus, dbc_name, message_v)
self.update_strings([])

def __dealloc__(self):
if self.can:
del self.can

def update_strings(self, strings, sendcan=False):
for address in self.addresses:
self.vl_all[address].clear()

cdef vector[SignalValue] new_vals
cur_address = -1
vl = {}
vl_all = {}
ts_nanos = {}
updated_addrs = set()

self.can.update_strings(strings, new_vals, sendcan)
cdef vector[SignalValue].iterator it = new_vals.begin()
cdef SignalValue* cv
while it != new_vals.end():
cv = &deref(it)

# Check if the address has changed
if cv.address != cur_address:
cur_address = cv.address
vl = self.vl[cur_address]
vl_all = self.vl_all[cur_address]
ts_nanos = self.ts_nanos[cur_address]
updated_addrs.add(cur_address)

# Cast char * directly to unicode
cv_name = <unicode>cv.name
vl[cv_name] = cv.value
vl_all[cv_name] = cv.all_values
ts_nanos[cv_name] = cv.ts_nanos
preinc(it)

return updated_addrs
return self.can.update_strings(strings, sendcan)

@property
def can_valid(self):
Expand Down

0 comments on commit 09a5404

Please sign in to comment.