Skip to content

Commit

Permalink
Add --unbuffered option to sbp2json.py (#1400)
Browse files Browse the repository at this point in the history
Add a new `--unbuffered` command line option to sbp2json.py which enables a mode where it reads and parses messages one-at-a-time. The existing behaviour (which remains the default) waits for 4096 bytes before parsing, which means that when decoding real-time streams which only output messages intermittently, there are often large pauses where no output is produced, followed by a large batch of decoded messages at once.
  • Loading branch information
dgburr authored Jan 10, 2024
1 parent 3e01928 commit 22f5e7a
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 8 deletions.
Empty file modified python/sbp/__init__.py
100755 → 100644
Empty file.
Empty file modified python/sbp/acquisition.py
100755 → 100644
Empty file.
Empty file modified python/sbp/bootload.py
100755 → 100644
Empty file.
Empty file modified python/sbp/logging.py
100755 → 100644
Empty file.
Empty file modified python/sbp/msg.py
100755 → 100644
Empty file.
Empty file modified python/sbp/navigation.py
100755 → 100644
Empty file.
Empty file modified python/sbp/piksi.py
100755 → 100644
Empty file.
65 changes: 60 additions & 5 deletions python/sbp/sbp2json.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def base_cl_options():
help="the JSON serialization library to use, default: {}".format(DEFAULT_JSON))
parser.add_argument('--include', nargs="+", type=int, default=[],
help="list of SBP message IDs to include, empty means all")
parser.add_argument("--unbuffered", action="store_true",
help="disable buffering when reading data from input (slower)")
parser.add_argument('file', nargs='?', metavar='FILE', type=argparse.FileType('rb'),
default=sys.stdin, help="the input file, stdin by default")

Expand All @@ -61,6 +63,13 @@ def get_args():
return args


# return the read and expected CRCs from 'buf'
def get_crcs(buf, payload_len):
crc_read, = struct.unpack("<H", buf[SBP_HEADER_LEN + payload_len:SBP_HEADER_LEN + payload_len + 2])
crc_expected = binascii.crc_hqx(buf[1:SBP_HEADER_LEN + payload_len], 0)
return crc_read, crc_expected


def dump(args, res):
if 'json' == args.mode:
sys.stdout.write(json.dumps(res.to_json_dict(),
Expand All @@ -72,8 +81,54 @@ def dump(args, res):
sys.stdout.write("\n")


# generator to produce SBP messages from a file object
def iter_messages(fp):
# Generator which parses SBP messages from a file object.
# Messages are read one at a time from the stream.
def iter_messages_unbuffered(fp):
buf = b''

def read_bytes_until(target_len):
nonlocal buf
while len(buf) < target_len:
read_bytes = fp.read(target_len - len(buf))
if len(read_bytes) == 0:
raise IOError
buf += read_bytes

while True:
# read header
try:
read_bytes_until(SBP_HEADER_LEN)
except IOError:
return

# buf now contains at least SBP_HEADER_LEN bytes

preamble, msg_type, sender, payload_len = struct.unpack("<BHHB", buf[:SBP_HEADER_LEN])

# check preamble
if preamble != SBP_PREAMBLE:
buf = buf[1:] # drop first byte
continue

# read payload and CRC
try:
read_bytes_until(SBP_HEADER_LEN + payload_len + 2)
except IOError:
return

# check CRC
crc_read, crc_expected = get_crcs(buf, payload_len)
if crc_read == crc_expected:
yield msg_type, sender, payload_len, buf[:SBP_HEADER_LEN + payload_len + 2], crc_read
buf = buf[SBP_HEADER_LEN + payload_len + 3:] # drop message
else:
sys.stderr.write("CRC error: {} vs {} for msg type {}\n".format(crc_read, crc_expected, msg_type))
buf = buf[1:] # drop first byte


# Generator which parses SBP messages from a file object.
# Data is read from the stream in 4096 byte chunks.
def iter_messages_buffered(fp):
buf = memoryview(bytearray(4096))
unconsumed_offset = 0
read_offset = 0
Expand Down Expand Up @@ -114,8 +169,7 @@ def iter_messages(fp):
else:
# check CRC
b = b[:SBP_HEADER_LEN + payload_len + 2]
crc_read, = struct.unpack("<H", b[SBP_HEADER_LEN + payload_len:SBP_HEADER_LEN + payload_len + 2])
crc_expected = binascii.crc_hqx(b[1:SBP_HEADER_LEN + payload_len], 0)
crc_read, crc_expected = get_crcs(b, payload_len)
if crc_read == crc_expected:
yield msg_type, sender, payload_len, b, crc_read
consumed = SBP_HEADER_LEN + payload_len + 2
Expand All @@ -132,8 +186,9 @@ def iter_messages(fp):
def sbp_main(args):
reader = io.open(args.file.fileno(), 'rb')
include = set(args.include)
iter_fn = iter_messages_unbuffered if args.unbuffered else iter_messages_buffered

for msg_type, sender, payload_len, buf, crc_read in iter_messages(reader):
for msg_type, sender, payload_len, buf, crc_read in iter_fn(reader):
msg_buf = buf[SBP_HEADER_LEN:SBP_HEADER_LEN + payload_len]
if not include or msg_type in include:
try:
Expand Down
Empty file modified python/sbp/table.py
100755 → 100644
Empty file.
Empty file modified python/sbp/utils.py
100755 → 100644
Empty file.
20 changes: 17 additions & 3 deletions python/tests/sbp/test_sbp2json.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
TEST_DATA = os.path.join(PYTHON_ROOT, "..", "test_data", "benchmark.sbp")


# make sure that we parse exactly 150000 SBP messages from TEST_DATA
def test_sbp2json():
def get_message_count(unbuffered):
msg_count = 0
def counter(args, res):
nonlocal msg_count
Expand All @@ -27,9 +26,24 @@ def counter(args, res):
# anonymous object to emulate parsed arguments
args = type('',(object,),{
'file': open(TEST_DATA, "rb"),
'include': []
'include': [],
'unbuffered': unbuffered
})()

sbp.sbp2json.sbp_main(args)

return msg_count


# make sure that we parse exactly 150000 SBP messages from TEST_DATA
# when running sbp2json without the --unbuffered flag
def test_sbp2json_buffered():
msg_count = get_message_count(False)
assert msg_count == 150000


# make sure that we parse exactly 150000 SBP messages from TEST_DATA
# when running sbp2json with the --unbuffered flag
def test_sbp2json_unbuffered():
msg_count = get_message_count(True)
assert msg_count == 150000

0 comments on commit 22f5e7a

Please sign in to comment.