Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LiDAR sensor fabric planner #60

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions examples/point_robot/setup/sensor_fabric_16.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
interval: 1
name: sensorfabric
robot_type: pointRobotUrdf
n: 3
m_base: 0.4
obst:
exp: 2.0
lam: 2.0
selfCol:
exp: 1.0
lam: 2.0
attractor:
k_psi: 1.0
speed:
ex_factor: 0.1
limits:
lam: 10.0
exp: 3.0
damper:
r_d: 0.80
b:
- 0.1
- 6.5
dynamic: False
number_lidar_rays: 16

25 changes: 25 additions & 0 deletions plannerbenchmark/exec/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ def initializeArgumentParser(self):
self._parser.add_argument(
"--numberRuns", "-n", type=int, default=1, help="Number of runs of the experiment"
)
self._parser.add_argument(
"--show-rays", "-sr", type=bool, default=False, help="Show LiDAR ray visualization by setting to True"
)
self._parser.add_argument("--ros", dest="ros", action="store_true")
self._parser.add_argument("--no-save", dest="save", action="store_false")
self._parser.add_argument("--random-goal", dest="random_goal", action="store_true")
Expand Down Expand Up @@ -94,6 +97,7 @@ def parseArguments(self):

self._ros = args.ros
self._compare = args.compare
self._show_rays = args.show_rays
if self._compare:
self._compareTimeStamp = args.caseSetup[-24:-9]
if self._ros:
Expand Down Expand Up @@ -170,7 +174,16 @@ def run(self):
timeStamp = self._compareTimeStamp
for planner in self._planners:
ob, t0 = self.reset(q0, q0dot)
try:
# Sets the LiDAR data to 100.0 for the first step.
ob['lidarSensor'] = np.full([planner._config.number_lidar_rays*2,], 100.0)
except AttributeError:
print("No LiDAR sensor available to set default values to 100.0")
if not self._ros:
try:
self._experiment.addScene(self._env, planner._config.number_lidar_rays)
except AttributeError:
print("No LiDAR sensor found, using default environment.")
self._experiment.addScene(self._env)
logger = Logger(self._res_folder, timeStamp)
logger.setSetups(self._experiment, planner)
Expand All @@ -197,6 +210,18 @@ def run(self):
if self._experiment.robotType() in ['groundRobot', 'boxer', 'albert']:
qudot = np.concatenate((ob['joint_state']['forward_velocity'], ob['joint_state']['velocity'][2:]))
observation += [qudot]
try:
sensor_data = ob['lidarSensor']
if self._show_rays:
if i == 0:
body_ids_old = None
show_lidar_mode = "spheres"
show_lidar_step = 1
if i in range(1, self._experiment.T(), show_lidar_step):
body_ids_old = self._experiment.showLidar(self._env, sensor_data, q, body_ids_old, planner._config.number_lidar_rays, show_lidar_mode)
observation += [sensor_data]
except KeyError as key_error:
observation += [0]
t_before = time.perf_counter()
action = planner.computeAction(*observation)
solving_time = time.perf_counter() - t_before
Expand Down
10 changes: 9 additions & 1 deletion plannerbenchmark/generic/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,21 @@ def env(self, render=False):
else:
return gym.make(self.envName(), render=render, dt=self.dt())

def addScene(self, env):
def showLidar(self, env, sensor_data, q, body_ids_old, number_lidar_rays, show_lidar_mode):
if show_lidar_mode == "spheres":
body_ids = env.show_lidar_spheres(sensor_data, q, body_ids_old, number_lidar_rays)
return body_ids

def addScene(self, env, nb_rays=0):
for obst in self._obstacles:
env.add_obstacle(obst)
try:
env.add_goal(self.goal())
except Exception as e:
logging.error(f"Error occured when adding goal to the scene, {e}")
if nb_rays > 0:
lidar = Lidar(4, nb_rays=nb_rays, raw_data=False)
env.add_sensor(lidar)

def shuffleInitConfiguration(self):
q0_new = np.random.uniform(low=self.limits()[0], high=self.limits()[1])
Expand Down
4 changes: 4 additions & 0 deletions plannerbenchmark/planner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
from plannerbenchmark.planner.fabricPlanner import FabricPlanner
except ModuleNotFoundError as e:
print(f"Module fabricPlanner not found: {e}")
try:
from plannerbenchmark.planner.sensorFabricPlanner import SensorFabricPlanner
except ModuleNotFoundError as e:
print(f"Module sensorFabricPlanner not found: {e}")
try:
from plannerbenchmark.planner.forcesProMpcPlanner import ForcesProMpcPlanner
except ModuleNotFoundError as e:
Expand Down
167 changes: 167 additions & 0 deletions plannerbenchmark/planner/sensorFabricPlanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# General dependencies
from dataclasses import dataclass, field
from typing import Dict
import numpy as np

# Dependencies for this specific planner
from fabrics.planner.parameterized_planner import ParameterizedFabricPlanner

# Motion planning scenes
from MotionPlanningEnv.dynamicSphereObstacle import DynamicSphereObstacle
from MotionPlanningEnv.sphereObstacle import SphereObstacle
from MotionPlanningGoal.dynamicSubGoal import DynamicSubGoal

# Dependencies on plannerbenchmark
from plannerbenchmark.generic.planner import Planner, PlannerConfig

@dataclass
class SensorFabricConfig(PlannerConfig):
m_base: float = 0.2
m_ratio: float = 1.0
obst: Dict[str, float] = field(default_factory = lambda: ({'exp': 1.0, 'lam': 1.0}))
selfCol: Dict[str, float] = field(default_factory = lambda: ({'exp': 1.0, 'lam': 1.0}))
attractor: Dict[str, float] = field(default_factory = lambda: ({'k_psi': 3.0}))
speed: Dict[str, float] = field(default_factory = lambda : ({'ex_factor': 1.0}))
dynamic: bool = False
limits: Dict[str, float] = field(default_factory = lambda: ({'exp': 1.0, 'lam': 1.0}))
damper: Dict[str, object] = field(default_factory = lambda :({'r_d': 0.8, 'b': [0.1, 6.5]}))
l_offset: float = 0.2
m_arm: float = 1.0
m_rot: float = 1.0
number_lidar_rays: int = 24
radius_ray_obstacles: float = 0.1


class SensorFabricPlanner(Planner):

def __init__(self, exp, **kwargs):
super().__init__(exp, **kwargs)
self._config = SensorFabricConfig(**kwargs)
self.reset()
base_energy: str = (
"0.5 * sym('base_inertia') * ca.dot(xdot, xdot)"
)
collision_geometry: str = (
"-2*sym('obst_geo_lam') / (x**sym('obst_geo_exp')) * (1 - ca.heaviside(xdot)) * xdot**2"
)
collision_finsler: str = (
f"(20.0/{self._config.number_lidar_rays}) / (x**2) * (1 - ca.heaviside(xdot)) * xdot**2"
)
limit_geometry: str = (
"-0.1 / (x ** 1) * xdot ** 2"
)
limit_finsler: str = (
"1.0/(x**1) * (-0.5 * (ca.sign(xdot) - 1)) * xdot**2"
)
self_collision_geometry: str = (
"-sym('self_geo_lam') / (x ** sym('self_geo_exp')) * xdot ** 2"
)
self_collision_finsler: str = (
"1.0/(x**1) * (-0.5 * (ca.sign(xdot) - 1)) * xdot**2"
)
attractor_potential: str = (
"5.0 * (ca.norm_2(x) + 1 / 10 * ca.log(1 + ca.exp(-2 * 10 * ca.norm_2(x))))"
)
attractor_metric: str = (
"((2.0 - 0.3) * ca.exp(-1 * (0.75 * ca.norm_2(x))**2) + 0.3) * ca.SX(np.identity(x.size()[0]))"
)
self._planner = ParameterizedFabricPlanner(
self.config.n,
self._exp.robotType(),
base_energy=base_energy,
collision_geometry=collision_geometry,
collision_finsler=collision_finsler,
self_collision_geometry=self_collision_geometry,
self_collision_finsler=self_collision_finsler,
attractor_potential=attractor_potential,
attractor_metric=attractor_metric,
)
self._collision_links = [i for i in range(1, self.config.n+1)]
self._collision_links = [1]
self._number_static_obstacles = self._config.number_lidar_rays
self._dynamic_goal = False

def initialize_runtime_arguments(self):
self._runtime_arguments = {}
self._runtime_arguments['weight_goal_0'] = np.array(self.config.attractor['k_psi'])
self._runtime_arguments['base_inertia'] = np.array([self.config.m_base])
for j in range(self._number_static_obstacles):
for i in self._collision_links:
self._runtime_arguments[f'obst_geo_exp_obst_{j}_{i}_leaf'] = np.array([self.config.obst['exp']])
self._runtime_arguments[f'obst_geo_lam_obst_{j}_{i}_leaf'] = np.array([self.config.obst['lam']])
self._runtime_arguments[f'radius_body_{i}'] = np.array([self._r_body])
for j in range(self._number_dynamic_obstacles):
self._runtime_arguments[f'radius_dynamic_obst_{j}'] = np.array([self._dynamic_obsts[j].radius()])
for i in self._collision_links:
self._runtime_arguments[f'obst_geo_exp_dynamic_obst_{j}_{i}_leaf'] = np.array([self.config.obst['exp']])
self._runtime_arguments[f'obst_geo_lam_dynamic_obst_{j}_{i}_leaf'] = np.array([self.config.obst['lam']])
self._runtime_arguments[f'radius_body_{i}'] = np.array([self._r_body])
for link, paired_links in self._self_collision_dict.items():
for paired_link in paired_links:
self._runtime_arguments[f'self_geo_lam_self_collision_{link}_{paired_link}'] = np.array([self.config.selfCol['lam']])
self._runtime_arguments[f'self_geo_exp_self_collision_{link}_{paired_link}'] = np.array([self.config.selfCol['exp']])

def setObstacles(self, obsts, r_body):
self._dynamic_obsts = []
self._static_obsts = []
for obst in obsts:
if isinstance(obst, DynamicSphereObstacle):
self._dynamic_obsts.append(obst)
else:
self._static_obsts.append(obst)
self._number_dynamic_obstacles = 0

def setSelfCollisionAvoidance(self, r_body):
self_collision_pairs = self._exp.selfCollisionPairs()
self._self_collision_dict = {}
for pair in self_collision_pairs:
if pair[0] in self._self_collision_dict.keys():
self._self_collision_dict[pair[0]].append(pair[1])
else:
self._self_collision_dict[pair[0]] = [pair[1]]
self._r_body = r_body

def setJointLimits(self, limits):
self._limits = limits

def setGoal(self, goal):
self._dynamic_goal = isinstance(goal.primary_goal(), DynamicSubGoal)
self._goal = goal

def concretize(self):
self._planner.set_components(
self._collision_links,
self._self_collision_dict,
self._goal,
limits=self._limits,
number_obstacles=self._config.number_lidar_rays,
number_dynamic_obstacles=0,
)
self._planner.concretize()
self.initialize_runtime_arguments()

def adapt_runtime_arguments(self, args):
time = args[3]
self._runtime_arguments['q'] = args[0]
self._runtime_arguments['qdot'] = args[1]
if self._dynamic_goal:
self._runtime_arguments['x_ref_goal_0_leaf'] = np.array(self._goal.primary_goal().position(t = time))
self._runtime_arguments['xdot_ref_goal_0_leaf'] = np.array(self._goal.primary_goal().velocity(t = time))
self._runtime_arguments['xddot_ref_goal_0_leaf'] = np.array(self._goal.primary_goal().acceleration(t = time))
else:
self._runtime_arguments['x_goal_0'] = np.array(self._goal.primary_goal().position())
if len(args) > 2:
ob_lidar = args[2].reshape(self._config.number_lidar_rays, 2) + args[0][0:2]
ob_lidar = np.append(ob_lidar, np.zeros((self._config.number_lidar_rays, 1)), axis=1)
else:
ob_lidar = [[100, 100, 100],] * self._config.number_lidar_rays
for j in range(self._config.number_lidar_rays):
self._runtime_arguments[f'radius_obst_{j}'] = np.array([self._config.radius_ray_obstacles])
self._runtime_arguments[f'x_obst_{j}'] = ob_lidar[j]

def computeAction(self, *args):
self.adapt_runtime_arguments(args)
action = np.zeros(3)
action = self._planner.compute_action(**self._runtime_arguments)
return action