Skip to content

Commit

Permalink
support new perception_reproducer
Browse files Browse the repository at this point in the history
  • Loading branch information
xtk8532704 committed Sep 27, 2024
1 parent f874652 commit f37150c
Show file tree
Hide file tree
Showing 7 changed files with 582 additions and 72 deletions.
2 changes: 1 addition & 1 deletion perception/multi_object_tracker/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<depend>tier4_autoware_utils</depend>
<depend>tier4_perception_msgs</depend>
<depend>unique_identifier_msgs</depend>

<depend>diagnostic_updater</depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>autoware_lint_common</test_depend>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
#!/usr/bin/env python3

# Copyright 2023 TIER IV, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from subprocess import CalledProcessError
from subprocess import check_output
import time

from autoware_auto_perception_msgs.msg import DetectedObjects
from autoware_auto_perception_msgs.msg import PredictedObjects
from autoware_auto_perception_msgs.msg import TrackedObjects
from autoware_auto_perception_msgs.msg import TrafficSignalArray as AutoTrafficSignalArray
from autoware_perception_msgs.msg import TrafficSignalArray
from geometry_msgs.msg import PoseWithCovarianceStamped
from nav_msgs.msg import Odometry
import psutil
from rclpy.node import Node
from rclpy.serialization import deserialize_message
from rosbag2_py import StorageFilter
from rosidl_runtime_py.utilities import get_message
from sensor_msgs.msg import PointCloud2
from utils import open_reader


class PerceptionReplayerCommon(Node):
def __init__(self, args, name):
super().__init__(name)
self.args = args

self.ego_pose = None
self.rosbag_objects_data = []
self.rosbag_ego_odom_data = []
self.rosbag_traffic_signals_data = []
self.is_auto_traffic_signals = False

# subscriber
self.sub_odom = self.create_subscription(
Odometry, "/localization/kinematic_state", self.on_odom, 1
)

# publisher
if self.args.detected_object:
self.objects_pub = self.create_publisher(
DetectedObjects, "/perception/object_recognition/detection/objects", 1
)
elif self.args.tracked_object:
self.objects_pub = self.create_publisher(
TrackedObjects, "/perception/object_recognition/tracking/objects", 1
)
else:
self.objects_pub = self.create_publisher(
PredictedObjects, "/perception/object_recognition/objects", 1
)

self.pointcloud_pub = self.create_publisher(
PointCloud2, "/perception/obstacle_segmentation/pointcloud", 1
)
self.recorded_ego_pub_as_initialpose = self.create_publisher(
PoseWithCovarianceStamped, "/initialpose", 1
)

self.recorded_ego_pub = self.create_publisher(
Odometry, "/perception_reproducer/rosbag_ego_odom", 1
)

# load rosbag
print("Stared loading rosbag")
if os.path.isdir(args.bag):
for bag_file in sorted(os.listdir(args.bag)):
if bag_file.endswith(self.args.rosbag_format):
self.load_rosbag(args.bag + "/" + bag_file)
else:
self.load_rosbag(args.bag)
print("Ended loading rosbag")

# temporary support old auto msgs
if self.is_auto_traffic_signals:
self.auto_traffic_signals_pub = self.create_publisher(
AutoTrafficSignalArray, "/perception/traffic_light_recognition/traffic_signals", 1
)
else:
self.traffic_signals_pub = self.create_publisher(
TrafficSignalArray, "/perception/traffic_light_recognition/traffic_signals", 1
)

# wait for ready to publish/subscribe
time.sleep(1.0)

def on_odom(self, odom):
self.ego_pose = odom.pose.pose

def load_rosbag(self, rosbag2_path: str):
reader = open_reader(str(rosbag2_path))

topic_types = reader.get_all_topics_and_types()
# Create a map for quicker lookup
type_map = {topic_types[i].name: topic_types[i].type for i in range(len(topic_types))}

objects_topic = (
"/perception/object_recognition/detection/objects"
if self.args.detected_object
else "/perception/object_recognition/tracking/objects"
if self.args.tracked_object
else "/perception/object_recognition/objects"
)
ego_odom_topic = "/localization/kinematic_state"
traffic_signals_topic = "/perception/traffic_light_recognition/traffic_signals"
topic_filter = StorageFilter(topics=[objects_topic, ego_odom_topic, traffic_signals_topic])
reader.set_filter(topic_filter)

while reader.has_next():
(topic, data, stamp) = reader.read_next()
msg_type = get_message(type_map[topic])
msg = deserialize_message(data, msg_type)
# import pdb; pdb.set_trace()
print(type(msg))
if topic == objects_topic:
self.rosbag_objects_data.append((stamp, msg))
if topic == ego_odom_topic:
self.rosbag_ego_odom_data.append((stamp, msg))
if topic == traffic_signals_topic:
self.rosbag_traffic_signals_data.append((stamp, msg))
self.is_auto_traffic_signals = (
"autoware_auto_perception_msgs" in type(msg).__module__
)

def kill_online_perception_node(self):
# kill node if required
kill_process_name = None
if self.args.detected_object:
kill_process_name = "dummy_perception_publisher_node"
elif self.args.tracked_object:
kill_process_name = "multi_object_tracker"
else:
kill_process_name = "map_based_prediction"
if kill_process_name:
try:
pid = check_output(["pidof", kill_process_name])
process = psutil.Process(int(pid[:-1]))
process.terminate()
except CalledProcessError:
pass

def binary_search(self, data, timestamp):
if data[-1][0] < timestamp:
return data[-1][1]
elif data[0][0] > timestamp:
return data[0][1]

low, high = 0, len(data) - 1

while low <= high:
mid = (low + high) // 2
if data[mid][0] < timestamp:
low = mid + 1
elif data[mid][0] > timestamp:
high = mid - 1
else:
return data[mid][1]

# Return the next timestamp's data if available
if low < len(data):
return data[low][1]
return None

def find_topics_by_timestamp(self, timestamp):
objects_data = self.binary_search(self.rosbag_objects_data, timestamp)
traffic_signals_data = self.binary_search(self.rosbag_traffic_signals_data, timestamp)
return objects_data, traffic_signals_data

def find_ego_odom_by_timestamp(self, timestamp):
return self.binary_search(self.rosbag_ego_odom_data, timestamp)
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
#!/usr/bin/env python3

# Copyright 2023 TIER IV, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import math
import time

from geometry_msgs.msg import Quaternion
import numpy as np
import rosbag2_py
from sensor_msgs.msg import PointCloud2
from sensor_msgs.msg import PointField
from tf_transformations import euler_from_quaternion
from tf_transformations import quaternion_from_euler


def get_rosbag_options(path, serialization_format="cdr"):
storage_options = rosbag2_py.StorageOptions(uri=path, storage_id="sqlite3")

converter_options = rosbag2_py.ConverterOptions(
input_serialization_format=serialization_format,
output_serialization_format=serialization_format,
)

return storage_options, converter_options


def open_reader(path: str):
storage_options, converter_options = get_rosbag_options(path)
reader = rosbag2_py.SequentialReader()
reader.open(storage_options, converter_options)
return reader


def calc_squared_distance(p1, p2):
return math.sqrt((p1.x - p2.x) ** 2 + (p1.y - p2.y) ** 2)


def create_empty_pointcloud(timestamp):
pointcloud_msg = PointCloud2()
pointcloud_msg.header.stamp = timestamp
pointcloud_msg.header.frame_id = "map"
pointcloud_msg.height = 1
pointcloud_msg.is_dense = True
pointcloud_msg.point_step = 16
field_name_vec = ["x", "y", "z"]
offset_vec = [0, 4, 8]
for field_name, offset in zip(field_name_vec, offset_vec):
field = PointField()
field.name = field_name
field.offset = offset
field.datatype = 7
field.count = 1
pointcloud_msg.fields.append(field)
return pointcloud_msg


def get_yaw_from_quaternion(orientation):
orientation_list = [
orientation.x,
orientation.y,
orientation.z,
orientation.w,
]
return euler_from_quaternion(orientation_list)[2]


def get_quaternion_from_yaw(yaw):
q = quaternion_from_euler(0, 0, yaw)
orientation = Quaternion()
orientation.x = q[0]
orientation.y = q[1]
orientation.z = q[2]
orientation.w = q[3]
return orientation


def translate_objects_coordinate(ego_pose, log_ego_pose, objects_msg):
log_ego_yaw = get_yaw_from_quaternion(log_ego_pose.orientation)
log_ego_pose_trans_mat = np.array(
[
[
math.cos(log_ego_yaw),
-math.sin(log_ego_yaw),
log_ego_pose.position.x,
],
[math.sin(log_ego_yaw), math.cos(log_ego_yaw), log_ego_pose.position.y],
[0.0, 0.0, 1.0],
]
)

ego_yaw = get_yaw_from_quaternion(ego_pose.orientation)
ego_pose_trans_mat = np.array(
[
[math.cos(ego_yaw), -math.sin(ego_yaw), ego_pose.position.x],
[math.sin(ego_yaw), math.cos(ego_yaw), ego_pose.position.y],
[0.0, 0.0, 1.0],
]
)

for o in objects_msg.objects:
log_object_pose = o.kinematics.pose_with_covariance.pose
log_object_yaw = get_yaw_from_quaternion(log_object_pose.orientation)
log_object_pos_vec = np.array([log_object_pose.position.x, log_object_pose.position.y, 1.0])

# translate object pose from ego pose in log to ego pose in simulation
object_pos_vec = np.linalg.inv(ego_pose_trans_mat).dot(
log_ego_pose_trans_mat.dot(log_object_pos_vec.T)
)

object_pose = o.kinematics.pose_with_covariance.pose
object_pose.position.x = object_pos_vec[0]
object_pose.position.y = object_pos_vec[1]
object_pose.orientation = get_quaternion_from_yaw(log_object_yaw + log_ego_yaw - ego_yaw)


class StopWatch:
def __init__(self, verbose):
# A dictionary to store the starting times
self.start_times = {}
self.verbose = verbose

def tic(self, name):
"""Store the current time with the given name."""
self.start_times[name] = time.perf_counter()

def toc(self, name):
"""Print the elapsed time since the last call to tic() with the same name."""
if name not in self.start_times:
print(f"No start time found for {name}!")
return

elapsed_time = (
time.perf_counter() - self.start_times[name]
) * 1000 # Convert to milliseconds
if self.verbose:
print(f"Time for {name}: {elapsed_time: .2f} ms")

# Reset the starting time for the name
del self.start_times[name]
Loading

0 comments on commit f37150c

Please sign in to comment.