-
Notifications
You must be signed in to change notification settings - Fork 0
/
fetch_helper.py
337 lines (275 loc) · 14.3 KB
/
fetch_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# Copyright (c) 2022 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""Tutorial to show how to use Spot to open a door.
The robot should start sitting on the ground, facing the door, approximately 1 meter away.
The robot should be powered off.
The use of an external estop client is required.
"""
import argparse
import math
import sys
import time
import cv2
import numpy as np
from bosdyn import geometry
from bosdyn.api import basic_command_pb2, geometry_pb2, manipulation_api_pb2
from bosdyn.api.manipulation_api_pb2 import (ManipulationApiFeedbackRequest, ManipulationApiRequest,
WalkToObjectInImage)
from bosdyn.api.spot import door_pb2
from bosdyn.client import create_standard_sdk, frame_helpers
from bosdyn.client.door import DoorClient
from bosdyn.client.image import ImageClient
from bosdyn.client.lease import LeaseClient, LeaseKeepAlive
from bosdyn.client.manipulation_api_client import ManipulationApiClient
from bosdyn.client.robot_command import RobotCommandBuilder, RobotCommandClient, blocking_stand
from bosdyn.client.util import add_base_arguments, authenticate, setup_logging
def safe_power_off(robot):
"""Sit and power off robot.
Args:
robot: (Robot) Interface to Spot robot.
"""
robot.logger.info("Powering off robot...")
robot.power_off(cut_immediately=False, timeout_sec=20)
assert not robot.is_powered_on(), "Robot power off failed."
robot.logger.info("Robot safely powered off.")
def walk_to_object_in_image(robot, request_manager):
"""Command the robot to walk toward user selected point. The WalkToObjectInImage feedback
reports a raycast result, converting the 2D touchpoint to a 3D location in space.
Args:
robot: (Robot) Interface to Spot robot.
request_manager: (RequestManager) Object for bookkeeping user touch points.
debug (bool): Show intermediate debug image.
Returns:
ManipulationApiResponse: Feedback from WalkToObjectInImage request.
"""
print("checkpoint 1")
manip_client = robot.ensure_client(ManipulationApiClient.default_service_name)
manipulation_api_request = request_manager.get_walk_to_object_in_image_request()
# Send a manipulation API request. Using the points selected by the user, the robot will
# walk toward the door handle.
robot.logger.info("Walking toward door...")
response = manip_client.manipulation_api_command(manipulation_api_request)
# Check feedback to verify the robot walks to the handle. The service will also return a
# FrameTreeSnapshot that contain a walkto_raycast_intersection point.
command_id = response.manipulation_cmd_id
feedback_request = ManipulationApiFeedbackRequest(manipulation_cmd_id=command_id)
timeout_sec = 15.0
end_time = time.time() + timeout_sec
print("checkpoint 2")
while time.time() < end_time:
response = manip_client.manipulation_api_feedback_command(feedback_request)
assert response.manipulation_cmd_id == command_id, "Got feedback for wrong command."
if (response.current_state == manipulation_api_pb2.MANIP_STATE_DONE):
print("checkpoint 3.1")
return response
raise Exception("Manip command timed out. Try repositioning the robot.")
robot.logger.info("Walked to door.")
print("checkpoint 3.2")
return response
def get_images_as_cv2(robot, sources):
"""Request image sources from robot. Decode and store as OpenCV image as well as proto.
Args:
robot: (Robot) Interface to Spot robot.
sources: (list) String names of image sources.
Returns:
dict: Dictionary from image source name to (image proto, CV2 image) pairs.
"""
image_client = robot.ensure_client(ImageClient.default_service_name)
image_responses = image_client.get_image_from_sources(sources)
image_dict = dict()
for response in image_responses:
# Convert image proto to CV2 image, for display later.
image = np.frombuffer(response.shot.image.data, dtype=np.uint8)
image = cv2.imdecode(image, -1)
image_dict[response.source.name] = (response, image)
return image_dict
class RequestManager:
"""Helper object for displaying side by side images to the user and requesting user selected
touchpoints. This class handles the bookkeeping for converting between a touchpoints of side by
side display image of the frontleft and frontright fisheye images and the individual images.
Args:
image_dict: (dict) Dictionary from image source name to (image proto, CV2 image) pairs.
window_name: (str) Name of display window.
"""
def __init__(self, image_dict, window_name, source, x, y, image):
self.image_dict = image_dict
self.window_name = window_name
self.handle_position_side_by_side = None
self.hinge_position_side_by_side = None
self._side_by_side = None
self.clicked_source = source
self.image_x = x
self.image_y = y
self.image = image
@property
def side_by_side(self):
"""cv2.Image: Side by side rotated frontleft and frontright fisheye images"""
if self._side_by_side is not None:
return self._side_by_side
# Convert PIL images to numpy for processing.
fr_fisheye_image = self.image_dict['frontright_fisheye_image'][1]
fl_fisheye_image = self.image_dict['frontleft_fisheye_image'][1]
# Rotate the images to align with robot Z axis.
fr_fisheye_image = cv2.rotate(fr_fisheye_image, cv2.ROTATE_90_CLOCKWISE)
fl_fisheye_image = cv2.rotate(fl_fisheye_image, cv2.ROTATE_90_CLOCKWISE)
self._side_by_side = np.hstack([fr_fisheye_image, fl_fisheye_image])
return self._side_by_side
def user_input_set(self):
"""bool: True if handle and hinge position set."""
return (self.handle_position_side_by_side and self.hinge_position_side_by_side)
def get_walk_to_object_in_image_request(self):
"""Convert from touchpoints in side by side image to a WalkToObjectInImage request.
Optionally show debug image of touch point.
Args:
debug (bool): Show intermediate debug image.
Returns:
ManipulationApiRequest: Request with WalkToObjectInImage info populated.
"""
# Figure out which source the user actually clicked.
# height, width = self.side_by_side.shape
# if self.handle_position_side_by_side[0] > width / 2:
# self.clicked_source = "frontleft_fisheye_image"
# rotated_pixel = self.handle_position_side_by_side
# rotated_pixel = (rotated_pixel[0] - width / 2, rotated_pixel[1])
# else:
# self.clicked_source = "frontright_fisheye_image"
# rotated_pixel = self.handle_position_side_by_side
# Undo pixel rotation by rotation 90 deg CCW.
#self.clicked_soource = #initialize!
manipulation_cmd = WalkToObjectInImage()
manipulation_cmd.pixel_xy.x = self.image_x
manipulation_cmd.pixel_xy.y = self.image_y
# Optionally show debug image.
# if debug:
# clicked_cv2 = self.image_dict[self.clicked_source][1]
# c = (255, 0, 0)
# cv2.circle(clicked_cv2,
# (int(manipulation_cmd.pixel_xy.x), int(manipulation_cmd.pixel_xy.y)), 30, c,
# 5)
# cv2.imshow("Debug", clicked_cv2)
# cv2.waitKey(0)
# cv2.destroyAllWindows()
# Populate the rest of the Manip API request.
clicked_image_proto = self.image
manipulation_cmd.frame_name_image_sensor = clicked_image_proto.shot.frame_name_image_sensor
manipulation_cmd.transforms_snapshot_for_camera.CopyFrom(
clicked_image_proto.shot.transforms_snapshot)
manipulation_cmd.camera_model.CopyFrom(clicked_image_proto.source.pinhole)
door_search_dist_meters = 1.25
manipulation_cmd.offset_distance.value = door_search_dist_meters
request = ManipulationApiRequest(walk_to_object_in_image=manipulation_cmd)
return request
@property
def vision_tform_sensor(self):
"""Look up vision_tform_sensor for sensor which user clicked.
Returns:
math_helpers.SE3Pose
"""
clicked_image_proto = self.image_dict[self.clicked_source][0]
frame_name_image_sensor = clicked_image_proto.shot.frame_name_image_sensor
snapshot = clicked_image_proto.shot.transforms_snapshot
return frame_helpers.get_a_tform_b(snapshot, frame_helpers.VISION_FRAME_NAME,
frame_name_image_sensor)
@property
def hinge_side(self):
"""Calculate if hinge is on left or right side of door based on user touchpoints.
Returns:
DoorCommand.HingeSide
"""
# handle_x = self.handle_position_side_by_side[0]
# hinge_x = self.hinge_position_side_by_side[0]
# if handle_x < hinge_x:
# hinge_side = door_pb2.DoorCommand.HINGE_SIDE_RIGHT
# else:
# hinge_side = door_pb2.DoorCommand.HINGE_SIDE_LEFT
return door_pb2.DoorCommand.HINGE_SIDE_LEFT
def _draw_text_on_image(image, text):
font_scale = 4
thickness = 4
font = cv2.FONT_HERSHEY_PLAIN
(text_width, text_height) = cv2.getTextSize(text, font, fontScale=font_scale,
thickness=thickness)[0]
rectangle_bgr = (255, 255, 255)
text_offset_x = 10
text_offset_y = image.shape[0] - 25
border = 10
box_coords = ((text_offset_x - border, text_offset_y + border),
(text_offset_x + text_width + border, text_offset_y - text_height - border))
cv2.rectangle(image, box_coords[0], box_coords[1], rectangle_bgr, cv2.FILLED)
cv2.putText(image, text, (text_offset_x, text_offset_y), font, fontScale=font_scale,
color=(0, 0, 0), thickness=thickness)
def open_door(robot, request_manager, snapshot):
"""Command the robot to automatically open a door via the door service API.
Args:
robot: (Robot) Interface to Spot robot.
request_manager: (RequestManager) Object for bookkeeping user touch points.
snapshot: (TransformSnapshot) Snapshot from the WalkToObjectInImage command which contains
the 3D location reported from a raycast based on the user hinge touch point.
"""
robot.logger.info("Opening door...")
# Using the raycast intersection point and the
vision_tform_raycast = frame_helpers.get_a_tform_b(snapshot, frame_helpers.VISION_FRAME_NAME,
frame_helpers.RAYCAST_FRAME_NAME)
vision_tform_sensor = request_manager.vision_tform_sensor
raycast_point_wrt_vision = vision_tform_raycast.get_translation()
ray_from_camera_to_object = raycast_point_wrt_vision - vision_tform_sensor.get_translation()
ray_from_camera_to_object_norm = np.sqrt(np.sum(ray_from_camera_to_object**2))
ray_from_camera_normalized = ray_from_camera_to_object / ray_from_camera_to_object_norm
auto_cmd = door_pb2.DoorCommand.AutoGraspCommand()
auto_cmd.frame_name = frame_helpers.VISION_FRAME_NAME
search_dist_meters = 0.25
search_ray = search_dist_meters * ray_from_camera_normalized
search_ray_start_in_frame = raycast_point_wrt_vision - search_ray
auto_cmd.search_ray_start_in_frame.CopyFrom(
geometry_pb2.Vec3(x=search_ray_start_in_frame[0], y=search_ray_start_in_frame[1],
z=search_ray_start_in_frame[2]))
print("search")
search_ray_end_in_frame = raycast_point_wrt_vision + search_ray
auto_cmd.search_ray_end_in_frame.CopyFrom(
geometry_pb2.Vec3(x=search_ray_end_in_frame[0], y=search_ray_end_in_frame[1],
z=search_ray_end_in_frame[2]))
auto_cmd.hinge_side = request_manager.hinge_side
auto_cmd.swing_direction = door_pb2.DoorCommand.SWING_DIRECTION_UNKNOWN
door_command = door_pb2.DoorCommand.Request(auto_grasp_command=auto_cmd)
request = door_pb2.OpenDoorCommandRequest(door_command=door_command)
# Command the robot to open the door.
door_client = robot.ensure_client(DoorClient.default_service_name)
response = door_client.open_door(request)
feedback_request = door_pb2.OpenDoorFeedbackRequest()
feedback_request.door_command_id = response.door_command_id
timeout_sec = 60.0
end_time = time.time() + timeout_sec
while time.time() < end_time:
feedback_response = door_client.open_door_feedback(feedback_request)
if (feedback_response.status !=
basic_command_pb2.RobotCommandFeedbackStatus.STATUS_PROCESSING):
raise Exception("Door command reported status ")
if (feedback_response.feedback.status == door_pb2.DoorCommand.Feedback.STATUS_COMPLETED):
robot.logger.info("Opened door.")
return
time.sleep(0.5)
raise Exception("Door command timed out. Try repositioning the robot.")
def execute_open_door(robot,image_sources,source, x, y, image):
"""High level behavior sequence for commanding the robot to open a door."""
# Capture images from the two from cameras.
sources = kImageSources
image_dict = get_images_as_cv2(robot, sources)
# Get handle and hinge locations from user input.
window_name = "Open Door Example"
request_manager = RequestManager(image_dict, window_name, source, x, y, image)
# request_manager.get_user_input_handle_and_hinge()
# assert request_manager.user_input_set(), "Failed to get user input for handle and hinge."
# Tell the robot to walk toward the door.
manipulation_feedback = walk_to_object_in_image(robot, request_manager)
time.sleep(3.0)
# The ManipulationApiResponse for the WalkToObjectInImage command returns a transform snapshot
# that contains where user clicked door handle point intersects the world. We use this
# intersection point to execute the door command.
snapshot = manipulation_feedback.transforms_snapshot_manipulation_data
# Execute the door command.
open_door(robot, request_manager, snapshot)
# Safely power off the robot, which sits then cuts motor power.
safe_power_off(robot)