-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
14 changed files
with
2,676 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,278 @@ | ||
# functions common among cars | ||
from collections import namedtuple | ||
from dataclasses import dataclass | ||
from enum import IntFlag, ReprEnum, EnumType | ||
from dataclasses import replace | ||
|
||
import capnp | ||
|
||
from cereal import car | ||
from openpilot.common.numpy_fast import clip, interp | ||
from openpilot.common.utils import Freezable | ||
from openpilot.selfdrive.car.docs_definitions import CarDocs | ||
|
||
|
||
# kg of standard extra cargo to count for drive, gas, etc... | ||
STD_CARGO_KG = 136. | ||
|
||
ButtonType = car.CarState.ButtonEvent.Type | ||
EventName = car.CarEvent.EventName | ||
AngleRateLimit = namedtuple('AngleRateLimit', ['speed_bp', 'angle_v']) | ||
|
||
|
||
def apply_hysteresis(val: float, val_steady: float, hyst_gap: float) -> float: | ||
if val > val_steady + hyst_gap: | ||
val_steady = val - hyst_gap | ||
elif val < val_steady - hyst_gap: | ||
val_steady = val + hyst_gap | ||
return val_steady | ||
|
||
|
||
def create_button_events(cur_btn: int, prev_btn: int, buttons_dict: dict[int, capnp.lib.capnp._EnumModule], | ||
unpressed_btn: int = 0) -> list[capnp.lib.capnp._DynamicStructBuilder]: | ||
events: list[capnp.lib.capnp._DynamicStructBuilder] = [] | ||
|
||
if cur_btn == prev_btn: | ||
return events | ||
|
||
# Add events for button presses, multiple when a button switches without going to unpressed | ||
for pressed, btn in ((False, prev_btn), (True, cur_btn)): | ||
if btn != unpressed_btn: | ||
events.append(car.CarState.ButtonEvent(pressed=pressed, | ||
type=buttons_dict.get(btn, ButtonType.unknown))) | ||
return events | ||
|
||
|
||
def gen_empty_fingerprint(): | ||
return {i: {} for i in range(8)} | ||
|
||
|
||
# these params were derived for the Civic and used to calculate params for other cars | ||
class VehicleDynamicsParams: | ||
MASS = 1326. + STD_CARGO_KG | ||
WHEELBASE = 2.70 | ||
CENTER_TO_FRONT = WHEELBASE * 0.4 | ||
CENTER_TO_REAR = WHEELBASE - CENTER_TO_FRONT | ||
ROTATIONAL_INERTIA = 2500 | ||
TIRE_STIFFNESS_FRONT = 192150 | ||
TIRE_STIFFNESS_REAR = 202500 | ||
|
||
|
||
# TODO: get actual value, for now starting with reasonable value for | ||
# civic and scaling by mass and wheelbase | ||
def scale_rot_inertia(mass, wheelbase): | ||
return VehicleDynamicsParams.ROTATIONAL_INERTIA * mass * wheelbase ** 2 / (VehicleDynamicsParams.MASS * VehicleDynamicsParams.WHEELBASE ** 2) | ||
|
||
|
||
# TODO: start from empirically derived lateral slip stiffness for the civic and scale by | ||
# mass and CG position, so all cars will have approximately similar dyn behaviors | ||
def scale_tire_stiffness(mass, wheelbase, center_to_front, tire_stiffness_factor): | ||
center_to_rear = wheelbase - center_to_front | ||
tire_stiffness_front = (VehicleDynamicsParams.TIRE_STIFFNESS_FRONT * tire_stiffness_factor) * mass / VehicleDynamicsParams.MASS * \ | ||
(center_to_rear / wheelbase) / (VehicleDynamicsParams.CENTER_TO_REAR / VehicleDynamicsParams.WHEELBASE) | ||
|
||
tire_stiffness_rear = (VehicleDynamicsParams.TIRE_STIFFNESS_REAR * tire_stiffness_factor) * mass / VehicleDynamicsParams.MASS * \ | ||
(center_to_front / wheelbase) / (VehicleDynamicsParams.CENTER_TO_FRONT / VehicleDynamicsParams.WHEELBASE) | ||
|
||
return tire_stiffness_front, tire_stiffness_rear | ||
|
||
|
||
DbcDict = dict[str, str] | ||
|
||
|
||
def dbc_dict(pt_dbc, radar_dbc, chassis_dbc=None, body_dbc=None) -> DbcDict: | ||
return {'pt': pt_dbc, 'radar': radar_dbc, 'chassis': chassis_dbc, 'body': body_dbc} | ||
|
||
|
||
def apply_driver_steer_torque_limits(apply_torque, apply_torque_last, driver_torque, LIMITS): | ||
|
||
# limits due to driver torque | ||
driver_max_torque = LIMITS.STEER_MAX + (LIMITS.STEER_DRIVER_ALLOWANCE + driver_torque * LIMITS.STEER_DRIVER_FACTOR) * LIMITS.STEER_DRIVER_MULTIPLIER | ||
driver_min_torque = -LIMITS.STEER_MAX + (-LIMITS.STEER_DRIVER_ALLOWANCE + driver_torque * LIMITS.STEER_DRIVER_FACTOR) * LIMITS.STEER_DRIVER_MULTIPLIER | ||
max_steer_allowed = max(min(LIMITS.STEER_MAX, driver_max_torque), 0) | ||
min_steer_allowed = min(max(-LIMITS.STEER_MAX, driver_min_torque), 0) | ||
apply_torque = clip(apply_torque, min_steer_allowed, max_steer_allowed) | ||
|
||
# slow rate if steer torque increases in magnitude | ||
if apply_torque_last > 0: | ||
apply_torque = clip(apply_torque, max(apply_torque_last - LIMITS.STEER_DELTA_DOWN, -LIMITS.STEER_DELTA_UP), | ||
apply_torque_last + LIMITS.STEER_DELTA_UP) | ||
else: | ||
apply_torque = clip(apply_torque, apply_torque_last - LIMITS.STEER_DELTA_UP, | ||
min(apply_torque_last + LIMITS.STEER_DELTA_DOWN, LIMITS.STEER_DELTA_UP)) | ||
|
||
return int(round(float(apply_torque))) | ||
|
||
|
||
def apply_dist_to_meas_limits(val, val_last, val_meas, | ||
STEER_DELTA_UP, STEER_DELTA_DOWN, | ||
STEER_ERROR_MAX, STEER_MAX): | ||
# limits due to comparison of commanded val VS measured val (torque/angle/curvature) | ||
max_lim = min(max(val_meas + STEER_ERROR_MAX, STEER_ERROR_MAX), STEER_MAX) | ||
min_lim = max(min(val_meas - STEER_ERROR_MAX, -STEER_ERROR_MAX), -STEER_MAX) | ||
|
||
val = clip(val, min_lim, max_lim) | ||
|
||
# slow rate if val increases in magnitude | ||
if val_last > 0: | ||
val = clip(val, | ||
max(val_last - STEER_DELTA_DOWN, -STEER_DELTA_UP), | ||
val_last + STEER_DELTA_UP) | ||
else: | ||
val = clip(val, | ||
val_last - STEER_DELTA_UP, | ||
min(val_last + STEER_DELTA_DOWN, STEER_DELTA_UP)) | ||
|
||
return float(val) | ||
|
||
|
||
def apply_meas_steer_torque_limits(apply_torque, apply_torque_last, motor_torque, LIMITS): | ||
return int(round(apply_dist_to_meas_limits(apply_torque, apply_torque_last, motor_torque, | ||
LIMITS.STEER_DELTA_UP, LIMITS.STEER_DELTA_DOWN, | ||
LIMITS.STEER_ERROR_MAX, LIMITS.STEER_MAX))) | ||
|
||
|
||
def apply_std_steer_angle_limits(apply_angle, apply_angle_last, v_ego, LIMITS): | ||
# pick angle rate limits based on wind up/down | ||
steer_up = apply_angle_last * apply_angle >= 0. and abs(apply_angle) > abs(apply_angle_last) | ||
rate_limits = LIMITS.ANGLE_RATE_LIMIT_UP if steer_up else LIMITS.ANGLE_RATE_LIMIT_DOWN | ||
|
||
angle_rate_lim = interp(v_ego, rate_limits.speed_bp, rate_limits.angle_v) | ||
return clip(apply_angle, apply_angle_last - angle_rate_lim, apply_angle_last + angle_rate_lim) | ||
|
||
|
||
def common_fault_avoidance(fault_condition: bool, request: bool, above_limit_frames: int, | ||
max_above_limit_frames: int, max_mismatching_frames: int = 1): | ||
""" | ||
Several cars have the ability to work around their EPS limits by cutting the | ||
request bit of their LKAS message after a certain number of frames above the limit. | ||
""" | ||
|
||
# Count up to max_above_limit_frames, at which point we need to cut the request for above_limit_frames to avoid a fault | ||
if request and fault_condition: | ||
above_limit_frames += 1 | ||
else: | ||
above_limit_frames = 0 | ||
|
||
# Once we cut the request bit, count additionally to max_mismatching_frames before setting the request bit high again. | ||
# Some brands do not respect our workaround without multiple messages on the bus, for example | ||
if above_limit_frames > max_above_limit_frames: | ||
request = False | ||
|
||
if above_limit_frames >= max_above_limit_frames + max_mismatching_frames: | ||
above_limit_frames = 0 | ||
|
||
return above_limit_frames, request | ||
|
||
|
||
def make_can_msg(addr, dat, bus): | ||
return [addr, 0, dat, bus] | ||
|
||
|
||
def get_safety_config(safety_model, safety_param = None): | ||
ret = car.CarParams.SafetyConfig.new_message() | ||
ret.safetyModel = safety_model | ||
if safety_param is not None: | ||
ret.safetyParam = safety_param | ||
return ret | ||
|
||
|
||
class CanBusBase: | ||
offset: int | ||
|
||
def __init__(self, CP, fingerprint: dict[int, dict[int, int]] | None) -> None: | ||
if CP is None: | ||
assert fingerprint is not None | ||
num = max([k for k, v in fingerprint.items() if len(v)], default=0) // 4 + 1 | ||
else: | ||
num = len(CP.safetyConfigs) | ||
self.offset = 4 * (num - 1) | ||
|
||
|
||
class CanSignalRateCalculator: | ||
""" | ||
Calculates the instantaneous rate of a CAN signal by using the counter | ||
variable and the known frequency of the CAN message that contains it. | ||
""" | ||
def __init__(self, frequency): | ||
self.frequency = frequency | ||
self.previous_counter = 0 | ||
self.previous_value = 0 | ||
self.rate = 0 | ||
|
||
def update(self, current_value, current_counter): | ||
if current_counter != self.previous_counter: | ||
self.rate = (current_value - self.previous_value) * self.frequency | ||
|
||
self.previous_counter = current_counter | ||
self.previous_value = current_value | ||
|
||
return self.rate | ||
|
||
|
||
@dataclass(frozen=True, kw_only=True) | ||
class CarSpecs: | ||
mass: float # kg, curb weight | ||
wheelbase: float # meters | ||
steerRatio: float | ||
centerToFrontRatio: float = 0.5 | ||
minSteerSpeed: float = 0.0 # m/s | ||
minEnableSpeed: float = -1.0 # m/s | ||
tireStiffnessFactor: float = 1.0 | ||
|
||
def override(self, **kwargs): | ||
return replace(self, **kwargs) | ||
|
||
|
||
@dataclass(order=True) | ||
class PlatformConfig(Freezable): | ||
car_docs: list[CarDocs] | ||
specs: CarSpecs | ||
|
||
dbc_dict: DbcDict | ||
|
||
flags: int = 0 | ||
|
||
platform_str: str | None = None | ||
|
||
def __hash__(self) -> int: | ||
return hash(self.platform_str) | ||
|
||
def override(self, **kwargs): | ||
return replace(self, **kwargs) | ||
|
||
def init(self): | ||
pass | ||
|
||
def __post_init__(self): | ||
self.init() | ||
|
||
|
||
class PlatformsType(EnumType): | ||
def __new__(metacls, cls, bases, classdict, *, boundary=None, _simple=False, **kwds): | ||
for key in classdict._member_names.keys(): | ||
cfg: PlatformConfig = classdict[key] | ||
cfg.platform_str = key | ||
cfg.freeze() | ||
return super().__new__(metacls, cls, bases, classdict, boundary=boundary, _simple=_simple, **kwds) | ||
|
||
|
||
class Platforms(str, ReprEnum, metaclass=PlatformsType): | ||
config: PlatformConfig | ||
|
||
def __new__(cls, platform_config: PlatformConfig): | ||
member = str.__new__(cls, platform_config.platform_str) | ||
member.config = platform_config | ||
member._value_ = platform_config.platform_str | ||
return member | ||
|
||
def __repr__(self): | ||
return f"<{self.__class__.__name__}.{self.name}>" | ||
|
||
@classmethod | ||
def create_dbc_map(cls) -> dict[str, DbcDict]: | ||
return {p: p.config.dbc_dict for p in cls} | ||
|
||
@classmethod | ||
def with_flags(cls, flags: IntFlag) -> set['Platforms']: | ||
return {p for p in cls if p.config.flags & flags} |
Oops, something went wrong.