Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: Handle ros2 uint8[] field as str without ValueError #975

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions python/mcap-ros2-support/mcap_ros2/_dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import os
import re
import base64
import binascii
from io import BytesIO
from types import SimpleNamespace
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
Expand Down Expand Up @@ -422,6 +424,12 @@ def _write_complex_type(
array: Union[List[Any], Tuple[Any], Any] = _get_property(
ros2_msg, field.name
)
if ftype.type == "uint8" and isinstance(array, str):
# Special handling for bytes in JSON
try:
array = base64.b64decode(array)
except binascii.Error:
pass
if array is None:
array = []
if (
Expand Down
34 changes: 34 additions & 0 deletions python/mcap-ros2-support/tests/test_ros2_writer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from io import BytesIO

import json
import base64

from mcap_ros2.decoder import DecoderFactory
from mcap_ros2.writer import Writer as Ros2Writer

Expand Down Expand Up @@ -59,3 +62,34 @@ def test_write_std_msgs_empty_messages():
assert msg.message.log_time == index
assert msg.message.publish_time == index
assert msg.message.sequence == index


def test_write_bytes_object_from_json_deserialization():
output = BytesIO()
ros_writer = Ros2Writer(output=output)
schema = ros_writer.register_msgdef("test_msgs/BytesMsg", "uint8[] data")

sample_bytes = bytes([0x77, 0x67, 0x65, 0x80])
to_serialize_dict = {"data": base64.b64encode(sample_bytes).decode("utf-8")}
sample_json = json.dumps(to_serialize_dict)
deserialized_dict = json.loads(sample_json)

for i in range(0, 10):
ros_writer.write_message(
topic="/test",
schema=schema,
message=deserialized_dict,
log_time=i,
publish_time=i,
sequence=i,
)
ros_writer.finish()

output.seek(0)
for index, msg in enumerate(read_ros2_messages(output)):
assert msg.channel.topic == "/test"
assert msg.schema.name == "test_msgs/BytesMsg"
assert msg.decoded_message.data == sample_bytes
assert msg.message.log_time == index
assert msg.message.publish_time == index
assert msg.message.sequence == index
Loading