Skip to content

Commit

Permalink
Fix Xbox controller; introduce Takeover policy w/o brake (metadrivers…
Browse files Browse the repository at this point in the history
…e#615)

* try to have better feeling in keyboard

* allow detecting takeover when pressing space in the keyboard

* revert the manual controller back

* revert to old throttle behavior?

* revert to old throttle behavior?

* add a new class

* add a new class

* Update XBox controller (seems pygame key mapping is wrong?)

* clean
  • Loading branch information
pengzhenghao authored Jan 25, 2024
1 parent 08b8b92 commit 2949cd1
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 58 deletions.
115 changes: 66 additions & 49 deletions metadrive/engine/core/manual_controller.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import math

import numpy as np
from direct.controls.InputState import InputState

Expand Down Expand Up @@ -27,6 +28,7 @@ def process_others(self, *args, **kwargs):
class KeyboardController(Controller):
STEERING_INCREMENT = 0.04
STEERING_DECAY = 0.25
STEERING_INCREMENT_WHEN_INVERSE_DIRECTION = 0.25

THROTTLE_INCREMENT = 0.1
THROTTLE_DECAY = 0.2
Expand All @@ -44,74 +46,77 @@ def __init__(self, pygame_control):
self.inputs.watchWithModifiers('reverse', 's')
self.inputs.watchWithModifiers('turnLeft', 'a')
self.inputs.watchWithModifiers('turnRight', 'd')
self.inputs.watchWithModifiers('takeover', 'space')
self.steering = 0.
self.throttle_brake = 0.
self.takeover = False
self.np_random = np.random.RandomState(None)

def process_input(self, vehicle):
if not self.pygame_control:
steering = 0.
throttle_brake = 0.
if not self.inputs.isSet('turnLeft') and not self.inputs.isSet('turnRight'):
steering = 0.
else:
if self.inputs.isSet('turnLeft'):
steering = 1.0
if self.inputs.isSet('turnRight'):
steering = -1.0
if not self.inputs.isSet('forward') and not self.inputs.isSet("reverse"):
throttle_brake = 0.
left_key_pressed = right_key_pressed = up_key_pressed = down_key_pressed = False
if self.inputs.isSet('turnLeft'):
left_key_pressed = True
if self.inputs.isSet('turnRight'):
right_key_pressed = True
if self.inputs.isSet('forward'):
up_key_pressed = True
if self.inputs.isSet('reverse'):
down_key_pressed = True
if self.inputs.isSet('takeover'):
self.takeover = True
else:
if self.inputs.isSet('forward'):
throttle_brake = 1.0
if self.inputs.isSet('reverse'):
throttle_brake = -1.0
self.takeover = False
else:
steering = 0.
throttle_brake = 0.
key_press = pygame.key.get_pressed()
throttle_brake += key_press[pygame.K_w] - key_press[pygame.K_s]
steering += key_press[pygame.K_a] - key_press[pygame.K_d]

self.further_process(steering, throttle_brake)

return np.array([self.steering, self.throttle_brake], dtype=np.float64)

def further_process(self, steering, throttle_brake):
if steering == 0.:
left_key_pressed = key_press[pygame.K_a]
right_key_pressed = key_press[pygame.K_d]
up_key_pressed = key_press[pygame.K_w]
down_key_pressed = key_press[pygame.K_s]
# TODO: We haven't implement takeover event when using Pygame renderer.

# If no left or right is pressed, steering decays to the center.
if not (left_key_pressed or right_key_pressed):
if self.steering > 0.:
self.steering -= self.STEERING_DECAY
self.steering = max(0., self.steering)
elif self.steering < 0.:
self.steering += self.STEERING_DECAY
self.steering = min(0., self.steering)
if throttle_brake == 0.:
elif left_key_pressed:
if self.steering >= 0.0: # If left is pressed and steering is in left, increment the steering a little bit.
self.steering += self.STEERING_INCREMENT
else: # If left is pressed but steering is in right, steering back to left side a little faster.
self.steering += self.STEERING_INCREMENT_WHEN_INVERSE_DIRECTION
elif right_key_pressed:
if self.steering <= 0.: # If right is pressed and steering is in right, increment the steering a little
self.steering -= self.STEERING_INCREMENT
else: # If right is pressed but steering is in left, steering back to right side a little faster.
self.steering -= self.STEERING_INCREMENT_WHEN_INVERSE_DIRECTION

# If no up or down is pressed, throttle decays to the center.
if not (up_key_pressed or down_key_pressed):
if self.throttle_brake > 0.:
self.throttle_brake -= self.THROTTLE_DECAY
self.throttle_brake = max(self.throttle_brake, 0.)
elif self.throttle_brake < 0.:
self.throttle_brake += self.BRAKE_DECAY
self.throttle_brake = min(0., self.throttle_brake)

if steering > 0.:
self.steering += self.STEERING_INCREMENT if self.steering > 0. else self.STEERING_DECAY
elif steering < 0.:
self.steering -= self.STEERING_INCREMENT if self.steering < 0. else self.STEERING_DECAY

if throttle_brake > 0.:
elif up_key_pressed:
self.throttle_brake = max(self.throttle_brake, 0.)
self.throttle_brake += self.THROTTLE_INCREMENT
elif throttle_brake < 0.:
elif down_key_pressed:
self.throttle_brake = min(self.throttle_brake, 0.)
self.throttle_brake -= self.BRAKE_INCREMENT

rand = self.np_random.rand() / 10000
# self.throttle_brake += rand[0]
self.steering += rand

self.throttle_brake = min(max(-1., self.throttle_brake), 1.)
self.steering = min(max(-1., self.steering), 1.)

return np.array([self.steering, self.throttle_brake], dtype=np.float64)

def process_others(self, takeover_callback=None):
"""This function allows the outer loop to call callback if some signal is received by the controller."""
if (takeover_callback is None) or (not self.pygame_control) or (not pygame.get_init()):
Expand Down Expand Up @@ -191,17 +196,23 @@ def process_input(self, vehicle):
class XboxController(Controller):
"""Control class for Xbox wireless controller
Accept both wired and wireless connection
Max steering, throttle, and break are bound by _discount
Max steering, throttle, and break are bound by _discount.
See https://www.pygame.org/docs/ref/joystick.html#xbox-360-controller-pygame-2-x for key mapping.
"""
STEERING_DISCOUNT = 0.5
THROTTLE_DISCOUNT = 0.5
THROTTLE_DISCOUNT = 0.4
BREAK_DISCOUNT = 0.5
BUTTON_X_MAP = 2
BUTTON_Y_MAP = 3

BUTTON_A_MAP = 0
BUTTON_B_MAP = 1
TRIGGER_RIGHT_MAP = 5
TRIGGER_LEFT_MAP = 2
BUTTON_X_MAP = 2
BUTTON_Y_MAP = 3

STEERING_AXIS = 0 # Left stick left-right direction.
THROTTLE_AXIS = 3 # Right stick up-down direction.
TAKEOVER_AXIS_2 = 4 # Right trigger
TAKEOVER_AXIS_1 = 5 # Left trigger

def __init__(self):
try:
Expand Down Expand Up @@ -233,19 +244,25 @@ def __init__(self):

def process_input(self, vehicle):
pygame.event.pump()
steering = -self.joystick.get_axis(0)
steering = -self.joystick.get_axis(self.STEERING_AXIS)
if abs(steering) < 0.05:
steering = 0
elif steering < 0:
steering = -(math.pow(2, abs(steering) * self.STEERING_DISCOUNT) - 1)
else:
steering = math.pow(2, abs(steering) * self.STEERING_DISCOUNT) - 1
raw_throttle = self.joystick.get_axis(self.TRIGGER_RIGHT_MAP)
raw_brake = self.joystick.get_axis(self.TRIGGER_LEFT_MAP)
# 1+raw_throttle will map throttle between 0,2 need *0.5 to bound it between 0,1
throttle = (1 + raw_throttle) * 0.5 * self.THROTTLE_DISCOUNT
brake = (1 + raw_brake) * 0.5 * self.BREAK_DISCOUNT
throttle_brake = throttle - brake

raw_throttle_brake = -self.joystick.get_axis(self.THROTTLE_AXIS)
if abs(raw_throttle_brake) < 0.05:
throttle_brake = 0
elif raw_throttle_brake < 0:
throttle_brake = -(math.pow(2, abs(raw_throttle_brake) * self.BREAK_DISCOUNT) - 1)
else:
throttle_brake = math.pow(2, abs(raw_throttle_brake) * self.THROTTLE_DISCOUNT) - 1

self.takeover = (
self.joystick.get_axis(self.TAKEOVER_AXIS_2) > -0.9 or self.joystick.get_axis(self.TAKEOVER_AXIS_1) > -0.9
)

self.button_x = True if self.joystick.get_button(self.BUTTON_X_MAP) else False
self.button_y = True if self.joystick.get_button(self.BUTTON_Y_MAP) else False
Expand Down
4 changes: 1 addition & 3 deletions metadrive/examples/drive_in_single_agent_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
random_lane_num=True,
on_continuous_line_done=False,
out_of_route_done=True,
vehicle_config=dict(show_lidar=True, show_navi_mark=False),
vehicle_config=dict(show_lidar=True, show_navi_mark=False, show_line_to_navi_mark=False),
# debug=True,
# debug_static_world=True,
map=4, # seven block
Expand All @@ -45,8 +45,6 @@
interface_panel=["rgb_camera", "dashboard"]
)
)
else:
config["vehicle_config"]["show_lidar"] = True
env = MetaDriveEnv(config)
try:
o, _ = env.reset(seed=21)
Expand Down
4 changes: 2 additions & 2 deletions metadrive/manager/agent_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from metadrive.manager.base_manager import BaseAgentManager
from metadrive.policy.AI_protect_policy import AIProtectPolicy
from metadrive.policy.idm_policy import TrajectoryIDMPolicy
from metadrive.policy.manual_control_policy import ManualControlPolicy, TakeoverPolicy
from metadrive.policy.manual_control_policy import ManualControlPolicy, TakeoverPolicy, TakeoverPolicyWithoutBrake
from metadrive.policy.replay_policy import ReplayTrafficParticipantPolicy

logger = get_logger()
Expand Down Expand Up @@ -63,7 +63,7 @@ def agent_policy(self):
from metadrive.engine.engine_utils import get_global_config
# Takeover policy shares the control between RL agent (whose action is input via env.step)
# and external control device (whose action is input via controller).
if get_global_config()["agent_policy"] in [TakeoverPolicy]:
if get_global_config()["agent_policy"] in [TakeoverPolicy, TakeoverPolicyWithoutBrake]:
return get_global_config()["agent_policy"]
if get_global_config()["manual_control"]:
if get_global_config().get("use_AI_protector", False):
Expand Down
28 changes: 24 additions & 4 deletions metadrive/policy/manual_control_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

logger = get_logger()

JOYSTICK_DEADZONE = 0.025


def get_controller(controller_name, pygame_control):
"""Get the controller object.
Expand All @@ -20,9 +22,10 @@ def get_controller(controller_name, pygame_control):
controller_name = str(controller_name).lower()
if controller_name == "keyboard":
return KeyboardController(pygame_control=pygame_control)
elif controller_name in ["xboxController", "xboxcontroller", "xbox", "steering_wheel"]:
elif controller_name in ["xboxController", "xboxcontroller", "xbox", "gamepad", "joystick", "steering_wheel",
"wheel"]:
try:
if controller_name == "steering_wheel":
if controller_name in ["steering_wheel", "wheel"]:
return SteeringWheelController()
else:
return XboxController()
Expand Down Expand Up @@ -114,11 +117,28 @@ def act(self, agent_id):
# if expert_action[0]*agent_action[0]< 0 or expert_action[1]*agent_action[1] < 0:
self.takeover = True
return expert_action
elif isinstance(self.controller, KeyboardController) and abs(sum(expert_action)) > 1e-2:
elif isinstance(self.controller, KeyboardController) and (self.controller.takeover
or abs(sum(expert_action)) > 0.01):
self.takeover = True
return expert_action
elif isinstance(self.controller, XboxController) and (self.controller.button_x or self.controller.button_y):
elif isinstance(self.controller, XboxController) and (self.controller.takeover or self.controller.button_a
or self.controller.button_b or
self.controller.button_x or self.controller.button_y
or abs(sum(expert_action)) > JOYSTICK_DEADZONE):
self.takeover = True
return expert_action
self.takeover = False
return agent_action


class TakeoverPolicyWithoutBrake(TakeoverPolicy):
"""
Takeover policy shares the control between RL agent (whose action is input via env.step) and
external control device (whose action is input via controller).
Note that this policy will discard brake in human's action.
"""
def act(self, agent_id):
action = super(TakeoverPolicyWithoutBrake, self).act(agent_id=agent_id)
if self.takeover and action[1] < 0.0:
action[1] = 0.0
return action

0 comments on commit 2949cd1

Please sign in to comment.