Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Vectorize TemporalTransformManager #301

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions pytransform3d/test/test_transform_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,47 @@ def test_numpy_timeseries_transform():
assert origin_of_A_in_B_y == pytest.approx(-1.28, abs=1e-2)


def test_numpy_timeseries_transform_multiple_timestamps_query():
Copy link
Contributor

@kopytjuk kopytjuk Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since pytest is used, consider extending the existing test function using pytest.mark.parametrize:

@pytest.mark.paramertrize("query_times", [4.9, np.array([4.9, 4.9])]
def test_numpy_timeseries_transform(query_times):
    # ...
    tm.get_transform_at_time("A", "B", query_times)

Having done that, we can have a single test, testing both the scalar and array inputs.

# create entities A and B together with their transformations from world
duration = 10.0 # [s]
sample_period = 0.5 # [s]
velocity_x = 1 # [m/s]
time_A, pq_arr_A = create_sinusoidal_movement(
duration, sample_period, velocity_x, y_start_offset=0.0, start_time=0.1
)
transform_WA = NumpyTimeseriesTransform(time_A, pq_arr_A)

time_B, pq_arr_B = create_sinusoidal_movement(
duration, sample_period, velocity_x, y_start_offset=2.0,
start_time=0.35
)
transform_WB = NumpyTimeseriesTransform(time_B, pq_arr_B)

tm = TemporalTransformManager()

tm.add_transform("A", "W", transform_WA)
tm.add_transform("B", "W", transform_WB)

query_time = time_A[0] # start time
A2W_at_start = pt.transform_from_pq(pq_arr_A[0, :])
A2W_at_start_2 = tm.get_transform_at_time("A", "W", query_time)
assert_array_almost_equal(A2W_at_start, A2W_at_start_2, decimal=2)

query_times = [4.9,4.9] # [s]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR adds quite a lot vectorized mathematical operations, which IMHO need a little more unittests than testing with an array of two equal values ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starting with the nitpicking here: please also check for PEP8 with either pep8 or flake8.

A2B_at_query_times = tm.get_transform_at_time("A", "B", query_times)

origin_of_A_pos = pt.vector_to_point([0, 0, 0])
origin_of_A_in_B_pos = pt.transform(A2B_at_query_times, origin_of_A_pos)
origin_of_A_in_B_xyz = origin_of_A_in_B_pos[:-1]

origin_of_A_in_B_x, origin_of_A_in_B_y = \
origin_of_A_in_B_xyz[0], origin_of_A_in_B_xyz[1]

assert origin_of_A_in_B_x == pytest.approx(-1.11, abs=1e-2)
assert origin_of_A_in_B_y == pytest.approx(-1.28, abs=1e-2)



def test_numpy_timeseries_transform_wrong_input_shapes():
n_steps = 10
with pytest.raises(
Expand Down
270 changes: 269 additions & 1 deletion pytransform3d/trajectories.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
transform_from_exponential_coordinates,
screw_axis_from_exponential_coordinates, screw_parameters_from_screw_axis,
screw_axis_from_screw_parameters)

from .rotations import norm_angle

def invert_transforms(A2Bs):
"""Invert transforms.
Expand Down Expand Up @@ -109,6 +109,79 @@ def concat_many_to_one(A2Bs, B2C):
return np.einsum("ij,njk->nik", B2C, A2Bs)


def concat_many_to_many(A2B, B2C):
"""Concatenate multiple transformations A2B with transformation B2C.

We use the extrinsic convention, which means that B2C is left-multiplied
to A2Bs.

Parameters
----------
A2B : array-like, shape (n_transforms, 4, 4)
Transforms from frame A to frame B

B2C : array-like, shape (n_transforms, 4, 4)
Transform from frame B to frame C

Returns
-------
A2Cs : array, shape (n_transforms, 4, 4)
Transforms from frame A to frame C

See Also
--------
concat_many_to_one :
Concatenate one transformation with multiple transformations.

pytransform3d.transformations.concat :
Concatenate two transformations.
"""
return np.einsum("ijk,ikl->ijl", B2C, A2B)


def concat_dynamic(A2B, B2C):
"""Concatenate multiple transformations A2B with transformation B2C.
Comment on lines +142 to +143
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify what "dynamic" means:

Suggested change
def concat_dynamic(A2B, B2C):
"""Concatenate multiple transformations A2B with transformation B2C.
def concat_dynamic(A2B, B2C):
"""Concatenate multiple transformations A2B with transformation B2C.
This function accepts different numbers or shapes of the input transformations, e.g. 4x4 or Nx4x4 shape.


We use the extrinsic convention, which means that B2C is left-multiplied
to A2Bs. it can handle different shapes of A2B and B2C dynamically.

Parameters
----------
A2B : array-like, shape (n_transforms, 4, 4) or (4, 4)
Transforms from frame A to frame B

B2C : array-like, shape (n_transforms, 4, 4) or (4, 4)
Transform from frame B to frame C

Returns
-------
A2Cs : array, shape (n_transforms, 4, 4) or (4, 4)
Transforms from frame A to frame C

See Also
--------
concat_many_to_one :
Concatenate multiple transformations with one transformation.

concat_one_to_many :
Concatenate one transformation with multiple transformations.

concat_many_to_many :
Concatenate multiple transformations with multiple transformations.

pytransform3d.transformations.concat :
Concatenate two transformations.
"""
if B2C.ndim == 2 and A2B.ndim == 2:
return B2C.dot(A2B)
if B2C.ndim == 2 and A2B.ndim == 3:
return concat_many_to_one(A2B, B2C)
if B2C.ndim == 3 and A2B.ndim == 2:
return concat_one_to_many(A2B, B2C)
if B2C.ndim == 3 and A2B.ndim == 3:
return concat_many_to_many(A2B, B2C)


def transforms_from_pqs(P, normalize_quaternions=True):
"""Get sequence of homogeneous matrices from positions and quaternions.

Expand Down Expand Up @@ -436,6 +509,201 @@ def pqs_from_dual_quaternions(dqs):
return out



def norm_axis_angles(a):
"""Normalize axis-angle representation.

Parameters
----------
a : array-like, shape (...,4)
Axis of rotation and rotation angle: (x, y, z, angle)

Returns
-------
a : array, shape (...,4)
Axis of rotation and rotation angle: (x, y, z, angle). The length
of the axis vector is 1 and the angle is in [0, pi). No rotation
is represented by [1, 0, 0, 0].
"""
angle = a[:, 3] # Extract the angle component (N,)
norm = np.linalg.norm(a[:, :3], axis=1) # Compute the norm of the first three elements (axis)


res = np.ones_like(a)

# Create masks for elements where angle or norm is zero
zero_mask = (angle == 0.0) | (norm == 0.0)


non_zero_mask = ~zero_mask
res[non_zero_mask, :3] = a[non_zero_mask, :3] / norm[non_zero_mask, None]
Copy link
Contributor

@kopytjuk kopytjuk Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the norm is one-dimensional after you applied np.linalg.norm on a 2D (Nx4) matrix:

Suggested change
res[non_zero_mask, :3] = a[non_zero_mask, :3] / norm[non_zero_mask, None]
res[non_zero_mask, :3] = a[non_zero_mask, :3] / norm[non_zero_mask]



angle_normalized = norm_angle(angle)

negative_angle_mask = angle_normalized < 0.0
res[negative_angle_mask, :3] *= -1.0
angle_normalized[negative_angle_mask] *= -1.0

res[non_zero_mask, 3] = angle_normalized[non_zero_mask]
return res


def axis_angles_from_quaternions(qs):
"""Compute axis-angle from quaternion.

This operation is called logarithmic map.

We usually assume active rotations.

Parameters
----------
q : array-like, shape (...,4)
Unit quaternion to represent rotation: (w, x, y, z)

Returns
-------
a : array, shape (...,4)
Axis of rotation and rotation angle: (x, y, z, angle). The angle is
constrained to [0, pi) so that the mapping is unique.
"""
ps = qs[:, 1:] # Extract the vector part of the quaternion
p_norm = np.linalg.norm(ps, axis=1) # Compute the norm of the vector part

# Create a mask for quaternions where p_norm is small
small_p_norm_mask = p_norm < np.finfo(float).eps

# Initialize the output with default values for small p_norm cases
result = np.zeros_like(qs)
result[small_p_norm_mask] = np.array([1.0, 0.0, 0.0, 0.0])

# For non-zero norms, calculate axis, clamped w, and angle
non_zero_mask = ~small_p_norm_mask
axis = np.zeros_like(ps)
axis[non_zero_mask] = ps[non_zero_mask] / p_norm[non_zero_mask, None]

w_clamped = np.clip(qs[:, 0], -1.0, 1.0) # Clamp the scalar part of the quaternion
angle = 2.0 * np.arccos(w_clamped)

# Stack the axis and the angle together and normalize the axis-angle representation
result[non_zero_mask] = norm_axis_angles(np.hstack((axis[non_zero_mask], angle[non_zero_mask, None])))

return result


def screw_parameters_from_dual_quaternions(dqs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this function is non-trivial when it comes to mathematical operations, would you please add at least a unittest so that a next developer wants to optimize/change/accelerate could check her work?

"""Compute screw parameters from dual quaternion.s

Parameters
----------
dq : array-like, shape (…,8)
Unit dual quaternion to represent transform:
(pw, px, py, pz, qw, qx, qy, qz)

Returns
-------
qs : array, shape (…,3)
Vector to a point on the screw axis

s_axiss : array, shape (…,3)
Direction vector of the screw axis

hs : array, shape (…,)
Pitch of the screw. The pitch is the ratio of translation and rotation
of the screw axis. Infinite pitch indicates pure translation.

thetas : array, shape (…,)
Parameter of the transformation: theta is the angle of rotation
and h * theta the translation.
"""
reals = dqs[:, :4]
duals = dqs[:, 4:]

# Vectorized axis angle from quaternion
a = axis_angles_from_quaternions(reals) # Should handle vectorized input
s_axis = a[:, :3]
thetas = a[:, 3]

# Vectorized quaternion concatenation and translation computation
translation = 2 * batch_concatenate_quaternions(duals, batch_q_conj(reals))[:, 1:]


outer_if_mask = np.abs(thetas) < np.finfo(float).eps
outer_else_mask = np.logical_not(outer_if_mask)


ds = np.linalg.norm(translation, axis=1)
inner_if_mask = ds < np.finfo(float).eps

outer_if_inner_if_mask = np.logical_and(outer_if_mask,inner_if_mask)
outer_if_inner_else_mask = np.logical_and(outer_if_mask,np.logical_not(inner_if_mask))

# Initialize the outputs
qs = np.zeros((dqs.shape[0], 3))
thetas[outer_if_mask] = ds[outer_if_mask]
hs = np.full(dqs.shape[0], np.inf)

if np.any(outer_if_inner_if_mask):
s_axis[outer_if_inner_if_mask] = np.array([1, 0, 0])

if np.any(outer_if_inner_else_mask):
s_axis[outer_if_inner_else_mask] = translation[outer_if_inner_else_mask] / ds[outer_if_inner_else_mask]

qs = np.zeros((dqs.shape[0], 3))
thetas[outer_if_mask] = ds[outer_if_mask]
hs = np.full(dqs.shape[0], np.inf)

if np.any(outer_else_mask):
distance = np.einsum('ij,ij->i', translation[outer_else_mask], s_axis[outer_else_mask])

moment = 0.5 * (
np.cross(translation[outer_else_mask], s_axis[outer_else_mask]) +
(translation[outer_else_mask] - distance[:, None] * s_axis[outer_else_mask])
/ np.tan(0.5 * thetas[outer_else_mask])[:, None]
)

qs[outer_else_mask] = np.cross(s_axis[outer_else_mask], moment)
hs[outer_else_mask] = distance / thetas[outer_else_mask]

return qs, s_axis, hs, thetas

def dual_quaternions_from_screw_parameters(qs, s_axis, hs, thetas):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this function is non-trivial when it comes to mathematical operations, would you please add at least a unittest so that a next developer wants to optimize/change/accelerate could check her work?

ds = np.zeros_like(hs)

h_is_inf_mask = np.isinf(hs)
h_is_not_inf_mask = np.logical_not(h_is_inf_mask)

mod_thetas = thetas.copy()
if np.any(h_is_inf_mask):
ds[h_is_inf_mask] = thetas[h_is_inf_mask]
mod_thetas[h_is_inf_mask] = 0

if np.any(h_is_not_inf_mask):
ds[h_is_not_inf_mask] = hs[h_is_not_inf_mask] * thetas[h_is_not_inf_mask]

moments = np.cross(qs, s_axis)
half_distances = 0.5 * ds
sin_half_angles = np.sin(0.5 * mod_thetas)
cos_half_angles = np.cos(0.5 * mod_thetas)

real_w = cos_half_angles
real_vec = sin_half_angles[..., None] * s_axis
dual_w = -half_distances * sin_half_angles
dual_vec = (sin_half_angles[..., None] * moments +
half_distances[..., None] * cos_half_angles[..., None] * s_axis)

result = np.concatenate([real_w[..., None], real_vec, dual_w[..., None], dual_vec], axis=-1)
return result

def dual_quaternions_power(dqs,ts):
q, s_axis, h, theta = screw_parameters_from_dual_quaternions(dqs)
return dual_quaternions_from_screw_parameters(q, s_axis, h, theta * ts)

def dual_quaternions_sclerp(starts,ends,ts):
diff = batch_concatenate_dual_quaternions(batch_dq_conj(starts), ends)
return batch_concatenate_dual_quaternions(starts, dual_quaternions_power(diff, ts))


def transforms_from_dual_quaternions(dqs):
"""Get transformations from dual quaternions.

Expand Down
5 changes: 5 additions & 0 deletions pytransform3d/trajectories.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ def concat_one_to_many(
def concat_many_to_one(
A2Bs: npt.ArrayLike, B2C: npt.ArrayLike) -> np.ndarray: ...

def concat_many_to_many(
A2B: npt.ArrayLike, B2C: npt.ArrayLike) -> np.ndarray: ...

def concat_dynamic(
A2B: npt.ArrayLike, B2C: npt.ArrayLike) -> np.ndarray: ...

def transforms_from_pqs(
P: npt.ArrayLike, normalize_quaternions: bool = ...) -> np.ndarray: ...
Expand Down
Loading