diff --git a/can/common.h b/can/common.h index 80897966f8..125f4feb64 100644 --- a/can/common.h +++ b/can/common.h @@ -43,8 +43,7 @@ class MessageState { unsigned int size; std::vector parse_sigs; - std::vector vals; - std::vector> all_vals; + std::map values; uint64_t last_seen_nanos; uint64_t check_threshold; @@ -66,6 +65,7 @@ class CANParser { const DBC *dbc = NULL; std::unordered_map message_states; + kj::ArrayPtr getAlignedData(const std::string &data); public: bool can_valid = false; @@ -79,14 +79,15 @@ class CANParser { CANParser(int abus, const std::string& dbc_name, const std::vector> &messages); CANParser(int abus, const std::string& dbc_name, bool ignore_checksum, bool ignore_counter); + SignalValue &getValue(uint32_t address, std::string &name); #ifndef DYNAMIC_CAPNP void update_string(const std::string &data, bool sendcan); - void update_strings(const std::vector &data, std::vector &vals, bool sendcan); + std::vector update_strings(const std::vector &data, bool sendcan); void UpdateCans(uint64_t nanos, const capnp::List::Reader& cans); #endif void UpdateCans(uint64_t nanos, const capnp::DynamicStruct::Reader& cans); void UpdateValid(uint64_t nanos); - void query_latest(std::vector &vals, uint64_t last_ts = 0); + std::vector query_latest(uint64_t last_ts = 0); }; class CANPacker { diff --git a/can/common.pxd b/can/common.pxd index 57053b781e..5b635a783f 100644 --- a/can/common.pxd +++ b/can/common.pxd @@ -50,9 +50,7 @@ cdef extern from "common_dbc.h": vector[Val] vals cdef struct SignalValue: - uint32_t address uint64_t ts_nanos - string name double value vector[double] all_values @@ -68,7 +66,8 @@ cdef extern from "common.h": bool can_valid bool bus_timeout CANParser(int, string, vector[pair[uint32_t, int]]) except + - void update_strings(vector[string]&, vector[SignalValue]&, bool) except + + SignalValue &getValue(uint32_t, string&) + vector[uint32_t] update_strings(vector[string]&, bool) except + cdef cppclass CANPacker: CANPacker(string) diff --git a/can/common_dbc.h b/can/common_dbc.h index ef4c98c803..5ec007205c 100644 --- a/can/common_dbc.h +++ b/can/common_dbc.h @@ -11,9 +11,7 @@ struct SignalPackValue { }; struct SignalValue { - uint32_t address; uint64_t ts_nanos; - std::string name; double value; // latest value std::vector all_values; // all values from this cycle }; diff --git a/can/parser.cc b/can/parser.cc index 3efa3ee369..37ccd9855d 100644 --- a/can/parser.cc +++ b/can/parser.cc @@ -37,7 +37,7 @@ bool MessageState::parse(uint64_t nanos, const std::vector &dat) { bool checksum_failed = false; bool counter_failed = false; - for (int i = 0; i < parse_sigs.size(); i++) { + for (int i = 0; i < parse_sigs.size(); ++i) { const auto &sig = parse_sigs[i]; int64_t tmp = get_raw_value(dat, sig); @@ -68,9 +68,14 @@ bool MessageState::parse(uint64_t nanos, const std::vector &dat) { return false; } - for (int i = 0; i < parse_sigs.size(); i++) { - vals[i] = tmp_vals[i]; - all_vals[i].push_back(vals[i]); + // Update values for each signal + for (int i = 0; i < parse_sigs.size(); ++i) { + // Retrieve the value entry for the current signal + auto &val = values[parse_sigs[i].name]; + + val.value = tmp_vals[i]; + val.ts_nanos = nanos; + val.all_values.push_back(val.value); } last_seen_nanos = nanos; @@ -138,8 +143,9 @@ CANParser::CANParser(int abus, const std::string& dbc_name, const std::vectorsigs; - state.vals.resize(msg->sigs.size()); - state.all_vals.resize(msg->sigs.size()); + for (auto &sig : msg->sigs) { + state.values[sig.name] = {}; + } } } @@ -160,27 +166,36 @@ CANParser::CANParser(int abus, const std::string& dbc_name, bool ignore_checksum .ignore_counter = ignore_counter, }; - for (const auto& sig : msg.sigs) { - state.parse_sigs.push_back(sig); - state.vals.push_back(0); - state.all_vals.push_back({}); - } - + state.parse_sigs = msg.sigs; message_states[state.address] = state; + // Initialize value entries for each signal in the message + for (auto &sig : msg.sigs) { + message_states[state.address].values[sig.name] = {}; + } } } #ifndef DYNAMIC_CAPNP -void CANParser::update_string(const std::string &data, bool sendcan) { - // format for board, make copy due to alignment issues. + +// If the input data is already aligned to capnp::word boundaries, +// a direct ArrayPtr is returned. Otherwise, a new ArrayPtr is created +// with the data copied into it, ensuring alignment. +kj::ArrayPtr CANParser::getAlignedData(const std::string &data) { + bool aligned = reinterpret_cast(data.data()) % sizeof(capnp::word) == 0; + if (aligned) { + return kj::ArrayPtr((capnp::word*)(data.data()), data.size() / sizeof(capnp::word)); + } + const size_t buf_size = (data.length() / sizeof(capnp::word)) + 1; if (aligned_buf.size() < buf_size) { aligned_buf = kj::heapArray(buf_size); } memcpy(aligned_buf.begin(), data.data(), data.length()); + return aligned_buf.slice(0, buf_size); +} - // extract the messages - capnp::FlatArrayMessageReader cmsg(aligned_buf.slice(0, buf_size)); +void CANParser::update_string(const std::string &data, bool sendcan) { + capnp::FlatArrayMessageReader cmsg(getAlignedData(data)); cereal::Event::Reader event = cmsg.getRoot(); if (first_nanos == 0) { @@ -194,7 +209,18 @@ void CANParser::update_string(const std::string &data, bool sendcan) { UpdateValid(last_nanos); } -void CANParser::update_strings(const std::vector &data, std::vector &vals, bool sendcan) { +std::vector CANParser::update_strings(const std::vector &data, bool sendcan) { + // Clear all_values + for (auto &state : message_states) { + for (auto &value : state.second.values) { + value.second.all_values.clear(); + } + } + + if (data.empty()) { + return {}; + } + uint64_t current_nanos = 0; for (const auto &d : data) { update_string(d, sendcan); @@ -202,16 +228,16 @@ void CANParser::update_strings(const std::vector &data, std::vector current_nanos = last_nanos; } } - query_latest(vals, current_nanos); + return query_latest(current_nanos); } void CANParser::UpdateCans(uint64_t nanos, const capnp::List::Reader& cans) { //DEBUG("got %d messages\n", cans.size()); - + std::vector data; bool bus_empty = true; // parse the messages - for (const auto cmsg : cans) { + for (const auto &cmsg : cans) { if (cmsg.getSrc() != bus) { // DEBUG("skip %d: wrong bus\n", cmsg.getAddress()); continue; @@ -225,7 +251,6 @@ void CANParser::UpdateCans(uint64_t nanos, const capnp::List::R } auto dat = cmsg.getDat(); - if (dat.size() > 64) { DEBUG("got message longer than 64 bytes: 0x%X %zu\n", cmsg.getAddress(), dat.size()); continue; @@ -236,9 +261,7 @@ void CANParser::UpdateCans(uint64_t nanos, const capnp::List::R // DEBUG("got message with unexpected length: expected %d, got %zu for %d", state_it->second.size, dat.size(), cmsg.getAddress()); // continue; //} - - std::vector data(dat.size(), 0); - memcpy(data.data(), dat.begin(), dat.size()); + data.assign(dat.begin(), dat.end()); state_it->second.parse(nanos, data); } @@ -301,25 +324,18 @@ void CANParser::UpdateValid(uint64_t nanos) { can_valid = (can_invalid_cnt < CAN_INVALID_CNT) && _counters_valid; } -void CANParser::query_latest(std::vector &vals, uint64_t last_ts) { - if (last_ts == 0) { - last_ts = last_nanos; - } - for (auto& kv : message_states) { - auto& state = kv.second; - if (last_ts != 0 && state.last_seen_nanos < last_ts) { - continue; - } - - for (int i = 0; i < state.parse_sigs.size(); i++) { - const Signal &sig = state.parse_sigs[i]; - SignalValue &v = vals.emplace_back(); - v.address = state.address; - v.ts_nanos = state.last_seen_nanos; - v.name = sig.name; - v.value = state.vals[i]; - v.all_values = state.all_vals[i]; - state.all_vals[i].clear(); +// Retrieve addresses of messages seen since the last_ts timestamp. +std::vector CANParser::query_latest(uint64_t last_ts) { + std::vector result; + result.reserve(message_states.size()); + for (const auto& [addr, state] : message_states) { + if (state.last_seen_nanos >= last_ts) { + result.push_back(addr); } } + return result; +} + +SignalValue &CANParser::getValue(uint32_t address, std::string &name) { + return message_states.at(address).values.at(name); } diff --git a/can/parser_pyx.pyx b/can/parser_pyx.pyx index eddad18ee7..77de69b900 100644 --- a/can/parser_pyx.pyx +++ b/can/parser_pyx.pyx @@ -1,25 +1,39 @@ # distutils: language = c++ # cython: c_string_encoding=ascii, language_level=3 -from cython.operator cimport dereference as deref, preincrement as preinc from libcpp.pair cimport pair +from libcpp.map cimport map from libcpp.string cimport string from libcpp.vector cimport vector from libc.stdint cimport uint32_t from .common cimport CANParser as cpp_CANParser -from .common cimport dbc_lookup, SignalValue, DBC +from .common cimport dbc_lookup, DBC, Msg import numbers from collections import defaultdict +class ValueDict(dict): + def __init__(self, address, func): + super().__init__() + self.address = address + self.func = func + + def __getitem__(self, key): + return self.func(self.address, key) + + def values(self): + return [self[key] for key in self] + + def items(self): + return [(key, self[key]) for key in self] + + cdef class CANParser: cdef: cpp_CANParser *can const DBC *dbc - vector[SignalValue] can_values - vector[uint32_t] addresses cdef readonly: dict vl @@ -37,73 +51,45 @@ cdef class CANParser: self.vl_all = {} self.ts_nanos = {} msg_name_to_address = {} - address_to_msg_name = {} + cdef map[uint32_t, Msg] address_to_msg for i in range(self.dbc[0].msgs.size()): msg = self.dbc[0].msgs[i] name = msg.name.decode("utf8") - msg_name_to_address[name] = msg.address - address_to_msg_name[msg.address] = name + address_to_msg[msg.address] = msg # Convert message names into addresses and check existence in DBC cdef vector[pair[uint32_t, int]] message_v for i in range(len(messages)): c = messages[i] address = c[0] if isinstance(c[0], numbers.Number) else msg_name_to_address.get(c[0]) - if address not in address_to_msg_name: + if address is None or address_to_msg.count(address) == 0: raise RuntimeError(f"could not find message {repr(c[0])} in DBC {self.dbc_name}") message_v.push_back((address, c[1])) - self.addresses.push_back(address) - name = address_to_msg_name[address] - self.vl[address] = {} + msg = address_to_msg[address] + name = msg.name.decode("utf8") + self.vl[address] = ValueDict(address, lambda addr, name: self.can.getValue(addr, name).value) self.vl[name] = self.vl[address] - self.vl_all[address] = defaultdict(list) + self.vl_all[address] = ValueDict(address, lambda addr, name: self.can.getValue(addr, name).all_values) self.vl_all[name] = self.vl_all[address] - self.ts_nanos[address] = {} + self.ts_nanos[address] = ValueDict(address, lambda addr, name: self.can.getValue(addr, name).ts_nanos) self.ts_nanos[name] = self.ts_nanos[address] + for sig in msg.sigs: + name = sig.name.decode("utf8") + self.vl[address][name] = 0 + self.vl_all[address][name] = [] + self.ts_nanos[address][name] = 0 self.can = new cpp_CANParser(bus, dbc_name, message_v) - self.update_strings([]) def __dealloc__(self): if self.can: del self.can def update_strings(self, strings, sendcan=False): - for address in self.addresses: - self.vl_all[address].clear() - - cdef vector[SignalValue] new_vals - cur_address = -1 - vl = {} - vl_all = {} - ts_nanos = {} - updated_addrs = set() - - self.can.update_strings(strings, new_vals, sendcan) - cdef vector[SignalValue].iterator it = new_vals.begin() - cdef SignalValue* cv - while it != new_vals.end(): - cv = &deref(it) - - # Check if the address has changed - if cv.address != cur_address: - cur_address = cv.address - vl = self.vl[cur_address] - vl_all = self.vl_all[cur_address] - ts_nanos = self.ts_nanos[cur_address] - updated_addrs.add(cur_address) - - # Cast char * directly to unicode - cv_name = cv.name - vl[cv_name] = cv.value - vl_all[cv_name] = cv.all_values - ts_nanos[cv_name] = cv.ts_nanos - preinc(it) - - return updated_addrs + return self.can.update_strings(strings, sendcan) @property def can_valid(self):