-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implemented detailed error messages.
- Loading branch information
Showing
14 changed files
with
591 additions
and
118 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
r""" | ||
Messages for DAG-CBOR decoding errors. | ||
""" | ||
|
||
import math | ||
from typing_extensions import Literal | ||
|
||
from multiformats import varint | ||
|
||
from ..encoding import EncodableType, _dag_cbor_code | ||
from ..utils import CBORDecodingError | ||
from ._stream import Stream, StreamSnapshot | ||
from ._err_utils import _TRUNC_BYTES, _bytes2hex, _decode_error_lines, _decode_error_msg, _extract_error_cause_lines, _cid_error_template | ||
|
||
def _required_multicodec(stream: Stream) -> str: | ||
curr_snapshot = stream.curr_snapshot | ||
msg = "Required 'dag-cbor' multicodec code." | ||
exp_bs = varint.encode(_dag_cbor_code) | ||
details = f"byte{'s' if curr_snapshot.latest_read_size > 1 else ''} should be 0x{exp_bs.hex()}." | ||
return _decode_error_msg(msg, curr_snapshot, details=details) | ||
|
||
def _multiple_top_level_items(stream: Stream) -> str: | ||
msg = "Encode and decode must operate on a single top-level CBOR object." | ||
details = "unexpected start byte of a second top-level CBOR object" | ||
return _decode_error_msg(msg, stream.curr_snapshot, details=details) | ||
|
||
def _invalid_float(stream: Stream, arg: float) -> str: | ||
if math.isnan(arg): | ||
msg = "NaN is not an allowed float value." | ||
float_str = "float('NaN')" | ||
else: | ||
assert math.isinf(arg), "Float must be NaN or infinite." | ||
s = ("" if arg > 0 else "-") | ||
msg = s+"Infinity is not an allowed float value." | ||
float_str = f"float('{s}Infinity')" | ||
details = f"struct.pack('>d', {float_str})" | ||
return _decode_error_msg(msg, stream.curr_snapshot, details=details, hl_start=1) | ||
|
||
def _unexpected_eof(stream: Stream, what: str, n: int, include_prev_snapshot: bool = True) -> str: | ||
prev_snapshot = stream.prev_snapshot if include_prev_snapshot else StreamSnapshot(bytes(), 0) | ||
curr_snapshot = stream.curr_snapshot | ||
msg = f"Unexpected EOF while attempting to read {what}." | ||
bytes_read = curr_snapshot.latest_read_size | ||
hl_start = prev_snapshot.latest_read_size | ||
details = f"{bytes_read} bytes read, out of {n} expected." | ||
snapshots = [prev_snapshot, curr_snapshot] if include_prev_snapshot else [curr_snapshot] | ||
return _decode_error_msg(msg, *snapshots, details=details, eof=True, hl_start=hl_start) | ||
|
||
def _invalid_additional_info(stream: Stream, additional_info: int, major_type: int) -> str: | ||
msg = f"Invalid additional info {additional_info} in data item head for major type 0x{major_type:x}." | ||
if major_type == 0x7: | ||
details = f"lower 5 bits are {additional_info:0>5b}, expected from {0:0>5b} to {23:0>5b}, or {27:0>5b}." | ||
else: | ||
details = f"lower 5 bits are {additional_info:0>5b}, expected from {0:0>5b} to {27:0>5b}." | ||
return _decode_error_msg(msg, stream.curr_snapshot, details=details) | ||
|
||
def _excessive_int_size(stream: Stream, arg: int, bytes_used: int, bytes_sufficient: int) -> str: | ||
s = 's' if bytes_sufficient > 1 else '' | ||
msg = f"Integer {arg} was encoded using {bytes_used} bytes, while {bytes_sufficient} byte{s} would have been enough." | ||
details = f"same as byte{s} 0x{arg:0>{2*bytes_sufficient}x}" | ||
return _decode_error_msg(msg, stream.prev_snapshot, stream.curr_snapshot, details=details, hl_start=1) | ||
|
||
def _unicode(stream: Stream, length: int, start: int, end: int, reason: str) -> str: | ||
prev_snapshot = stream.prev_snapshot | ||
curr_snapshot = stream.curr_snapshot | ||
msg = "String bytes are not valid utf-8 bytes." | ||
lines = [msg] | ||
n = curr_snapshot.latest_read_size | ||
ps = 0 | ||
pe = 0 | ||
if n <= _TRUNC_BYTES: | ||
ps = start | ||
pe = n-end | ||
str_details = f"string of length {length}" | ||
lines.extend(_decode_error_lines(prev_snapshot, curr_snapshot, details=str_details, hl_len=1)) | ||
lines.extend(_decode_error_lines(curr_snapshot, details=reason, start=start, end=end, pad_start=ps+prev_snapshot.latest_read_size, pad_end=pe)) | ||
return "\n".join(lines) | ||
|
||
def _list_item(list_head_snapshot: StreamSnapshot, idx: int, length: int, e: CBORDecodingError) -> str: | ||
lines = [ | ||
"Error while decoding list.", | ||
*_decode_error_lines(list_head_snapshot, details=f"list of length {length}", dots=True), | ||
f"Error occurred while decoding item at position {idx}: further details below.", | ||
*_extract_error_cause_lines(e) | ||
] | ||
return "\n".join(lines) | ||
|
||
def _dict_key_type(stream: Stream, major_type: int) -> str: | ||
msg = "Dictionary key is not of string type." | ||
details = f"major type is {hex(major_type)}, should be 0x3 (string) instead." | ||
return _decode_error_msg(msg, stream.curr_snapshot, details=details, hl_len=1, dots=True) | ||
|
||
def _dict_item(dict_head_snapshot: StreamSnapshot, item: Literal["key", "value"], idx: int, length: int, e: CBORDecodingError) -> str: | ||
lines = [ | ||
"Error while decoding dict.", | ||
*_decode_error_lines(dict_head_snapshot, details=f"dict of length {length}", dots=True), | ||
f"Error occurred while decoding {item} at position {idx}: further details below.", | ||
*_extract_error_cause_lines(e) | ||
] | ||
return "\n".join(lines) | ||
|
||
def _duplicate_dict_key(dict_head_snapshot: StreamSnapshot, stream: Stream, k: str, idx: int, length: int) -> str: | ||
lines = [ | ||
"Error while decoding dict.", | ||
*_decode_error_lines(dict_head_snapshot, details=f"dict of length {length}", dots=True), | ||
f"Duplicate key is found at position {idx}.", | ||
*_decode_error_lines(stream.curr_snapshot, details=f"decodes to key {repr(k)}") | ||
] | ||
return "\n".join(lines) | ||
|
||
def _dict_key_order(dict_head_snapshot: StreamSnapshot, kb0: bytes, idx0: int, kb1: bytes, idx1: int, length: int) -> str: | ||
# pylint: disable = too-many-arguments | ||
pad_len = max(len(str(idx0)), len(str(idx1))) | ||
idx0_str = f"{idx0: >{pad_len}}" | ||
idx1_str = f"{idx1: >{pad_len}}" | ||
lines = [ | ||
"Error while decoding dict.", | ||
*_decode_error_lines(dict_head_snapshot, details=f"dict of length {length}", dots=True), | ||
"Dictionary keys not in canonical order.", | ||
f" Key at pos #{idx0_str}: {_bytes2hex(kb0)}", | ||
f" Key at pos #{idx1_str}: {_bytes2hex(kb1)}", | ||
] | ||
return "\n".join(lines) | ||
|
||
def _invalid_tag(stream: Stream, arg: int) -> str: | ||
prev_snapshot = stream.prev_snapshot | ||
curr_snapshot = stream.curr_snapshot | ||
msg = "Error while decoding item of major type 0x6: only tag 42 is allowed." | ||
details = f"tag {arg}" | ||
hl_start = prev_snapshot.latest_read_size | ||
return _decode_error_msg(msg, prev_snapshot, curr_snapshot, details=details, hl_start=hl_start) | ||
|
||
def _cid(cid_head_snapshots: tuple[StreamSnapshot, StreamSnapshot], e: CBORDecodingError) -> str: | ||
return _cid_error_template(cid_head_snapshots, *_extract_error_cause_lines(e)) | ||
|
||
def _cid_bytes(cid_head_snapshots: tuple[StreamSnapshot, StreamSnapshot], stream: Stream, cid_bytes: EncodableType) -> str: | ||
decoded_type = type(cid_bytes).__name__ | ||
decoded_type_details = f"decodes to an item of type {repr(decoded_type)}" | ||
explanation = [ | ||
"CID bytes did not decode to an item of type 'bytes'.", | ||
*_decode_error_lines(stream.curr_snapshot, details=decoded_type_details), | ||
] | ||
return _cid_error_template(cid_head_snapshots, *explanation) | ||
|
||
def _cid_multibase(cid_head_snapshots: tuple[StreamSnapshot, StreamSnapshot], stream: Stream, cid_bytes: bytes) -> str: | ||
error_details = "byte should be 0x00" | ||
explanation = [ | ||
"CID does not start with the identity Multibase prefix.", | ||
*_decode_error_lines(stream.prev_snapshot, stream.curr_snapshot, details=error_details, hl_start=1, hl_len=1), | ||
] | ||
return _cid_error_template(cid_head_snapshots, *explanation) | ||
|
||
def _simple_value(stream: Stream, arg: int) -> str: | ||
msg = "Error while decoding major type 0x7: allowed simple values are 0x14, 0x15 and 0x16." | ||
details = f"simple value is {arg}" | ||
return _decode_error_msg(msg, stream.curr_snapshot, details=details) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
r""" | ||
Utility functions used to produce messages for DAG-CBOR decoding errors. | ||
""" | ||
|
||
from typing import List, Optional | ||
|
||
from ..utils import CBORDecodingError | ||
from ._stream import StreamSnapshot | ||
|
||
_TRUNC_BYTES = 16 | ||
|
||
def _bytes2hex(bs: bytes) -> str: | ||
if len(bs) <= _TRUNC_BYTES: | ||
return bs.hex() | ||
return bs[:1].hex()+"..."+bs[-1:].hex() # fixed length 7 < 2*_TRUNC_BYTES | ||
|
||
def _decode_error_lines(*snapshots: StreamSnapshot, details: Optional[str] = None, | ||
eof: bool = False, | ||
start: Optional[int] = None, | ||
end: Optional[int] = None, | ||
pad_start: int = 0, | ||
pad_end: int = 0, | ||
hl_start: int = 0, | ||
hl_len: Optional[int] = None, | ||
dots: bool = False, | ||
) -> List[str]: | ||
# pylint: disable = too-many-locals | ||
assert snapshots | ||
bs = bytes() | ||
pos = snapshots[0].latest_read_start | ||
for snapshot in snapshots: | ||
bs += snapshot.latest_read | ||
if start is None: | ||
start = 0 | ||
if end is None: | ||
end = len(bs) | ||
assert 0 <= start <= end <= len(bs) | ||
assert pad_start >= 0 | ||
assert pad_end >= 0 | ||
assert hl_start >= 0 | ||
bs = bs[start:end] | ||
pos += start | ||
pos_str = str(pos) | ||
pos_tab = " "*len(pos_str) | ||
bs_str = _bytes2hex(bs) | ||
truncated = len(bs_str) != 2*len(bs) | ||
if not bs_str: | ||
bs_str = "<EOF>" | ||
bs_tab = "^"*len(bs_str) | ||
else: | ||
if hl_len is None: | ||
hl_len = len(bs)-hl_start | ||
else: | ||
assert 0 <= hl_len <= len(bs)-start | ||
if truncated and not (hl_len == 1 and (hl_start in {0, len(bs)-1})): | ||
bs_tab = "^"*len(bs_str) | ||
else: | ||
bs_tab = " "*hl_start+"^^"*hl_len | ||
bs_str = " "*pad_start+bs_str+" "*pad_end | ||
bs_tab = " "*pad_start+bs_tab | ||
bytes_line = f"At byte #{pos_str}: {bs_str}" | ||
if truncated: | ||
last_byte_idx = pos+len(bs)-1 | ||
bytes_line += f" (last byte #{last_byte_idx})" | ||
if dots: | ||
bytes_line += "..." | ||
descr_line = f" {pos_tab} {bs_tab} {details}" | ||
lines = [bytes_line] | ||
if details is not None: | ||
lines.append(descr_line) | ||
return lines | ||
|
||
def _decode_error_msg(msg: str, *snapshots: StreamSnapshot, details: Optional[str] = None, | ||
eof: bool = False, | ||
start: Optional[int] = None, | ||
end: Optional[int] = None, | ||
hl_start: int = 0, | ||
hl_len: Optional[int] = None, | ||
dots: bool = False, | ||
) -> str: | ||
lines = [msg] | ||
lines.extend(_decode_error_lines(*snapshots, details=details, eof=eof, | ||
start=start, end=end, hl_start=hl_start, hl_len=hl_len, | ||
dots=dots)) | ||
return "\n".join(lines) | ||
|
||
|
||
def _extract_error_cause_lines(e: CBORDecodingError) -> List[str]: | ||
lines = str(e).split("\n") | ||
return [(r"\ " if idx == 0 else " ")+line for idx, line in enumerate(lines)] | ||
|
||
|
||
def _cid_error_template(cid_head_snapshots: tuple[StreamSnapshot, StreamSnapshot], *explanation: str) -> str: | ||
lines = [ | ||
"Error while decoding CID.", | ||
*_decode_error_lines(*cid_head_snapshots, details="CID tag", dots=True), | ||
*explanation | ||
] | ||
return "\n".join(lines) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
r""" | ||
Byte-streams and snapshots used in DAG-CBOR decoding, keeping track of latest and previous read byte chunks for error reporting purposes. | ||
""" | ||
from io import BufferedIOBase, BytesIO | ||
from typing import Optional | ||
|
||
class StreamSnapshot: | ||
r""" A snapshot of the current state of a stream. """ | ||
|
||
_bs: bytes | ||
_pos: int | ||
|
||
def __new__(cls, latest_read: bytes, next_read_start: int) -> "StreamSnapshot": | ||
instance = object.__new__(cls) | ||
instance._bs = latest_read | ||
instance._pos = next_read_start | ||
return instance | ||
|
||
@property | ||
def latest_read(self) -> bytes: | ||
r""" The latest byte chunk read from the stream. """ | ||
return self._bs | ||
|
||
@property | ||
def latest_read_size(self) -> int: | ||
r""" Size of the latest byte chunk read from the stream. """ | ||
return len(self._bs) | ||
|
||
@property | ||
def latest_read_start(self) -> int: | ||
r""" Start position in the stream for the latest byte chunk read. """ | ||
return self._pos-len(self._bs) | ||
|
||
@property | ||
def num_bytes_read(self) -> int: | ||
r""" Total number of bytes read so far in the stream. """ | ||
return self._pos | ||
|
||
class Stream: | ||
r""" | ||
Container for the byte-stream being decoded, offering additional book-keeping functionality used to produce detailed error messages. | ||
""" | ||
|
||
_buf: BufferedIOBase | ||
_bs: bytes | ||
_pos: int | ||
_prev_bs: bytes | ||
_prev_pos: int | ||
|
||
def __new__(cls, buffer: Optional[BufferedIOBase] = None, init_bytes_read: bytes = bytes()) -> "Stream": | ||
if buffer is None: | ||
buffer = BytesIO(bytes()) | ||
instance = object.__new__(cls) | ||
instance._buf = buffer | ||
instance._bs = init_bytes_read | ||
instance._pos = len(init_bytes_read) | ||
instance._prev_bs = bytes() | ||
instance._prev_pos = 0 | ||
return instance | ||
|
||
@property | ||
def curr_snapshot(self) -> "StreamSnapshot": | ||
r""" A snapshot of the current state of the stream. """ | ||
return StreamSnapshot(self._bs, self._pos) | ||
|
||
@property | ||
def prev_snapshot(self) -> "StreamSnapshot": | ||
r""" A snapshot of the state of the stream immediately before the latest non-extending read. """ | ||
return StreamSnapshot(self._prev_bs, self._prev_pos) | ||
|
||
def read(self, num_bytes: Optional[int] = None, *, extend: bool = False) -> bytes: | ||
r""" | ||
Read the given number of bytes from the stream. If :obj:`None`, reads all remaining bytes. | ||
If ``extend`` is set to :obj:`True`, the current stream snapshot (see :attr:`Stream.curr_snapshot`) is extended with the bytes just read, | ||
and the previous stream snapshot (see :attr:`Stream.prev_snapshot`) is kept. | ||
""" | ||
bs = self._buf.read(num_bytes) | ||
if extend: | ||
self._bs += bs | ||
self._pos += len(bs) | ||
else: | ||
self._prev_bs = self._bs | ||
self._prev_pos = self._pos | ||
self._bs = bs | ||
self._pos += len(bs) | ||
return bs |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.