From ab80af3872ff71512825d35ef2080c2bbd4d3ba4 Mon Sep 17 00:00:00 2001 From: sg495 Date: Fri, 3 Feb 2023 21:56:25 +0000 Subject: [PATCH] Implemented detailed error messages. --- dag_cbor/__init__.py | 2 +- .../{decoding.py => decoding/__init__.py} | 194 +++++++++--------- dag_cbor/decoding/_err.py | 156 ++++++++++++++ dag_cbor/decoding/_err_utils.py | 99 +++++++++ dag_cbor/decoding/_stream.py | 86 ++++++++ dag_cbor/encoding.py | 11 - dag_cbor/random.py | 2 +- dag_cbor/utils.py | 8 - docs/getting-started.rst | 9 + docs/make-api.json | 6 +- docs/make-api.py | 2 + setup.cfg | 1 + test/test_01_encode_decode_eq_original.py | 2 +- test_error_messages.py | 131 ++++++++++++ 14 files changed, 591 insertions(+), 118 deletions(-) rename dag_cbor/{decoding.py => decoding/__init__.py} (56%) create mode 100644 dag_cbor/decoding/_err.py create mode 100644 dag_cbor/decoding/_err_utils.py create mode 100644 dag_cbor/decoding/_stream.py create mode 100644 test_error_messages.py diff --git a/dag_cbor/__init__.py b/dag_cbor/__init__.py index 8f5c8a4..de102e1 100644 --- a/dag_cbor/__init__.py +++ b/dag_cbor/__init__.py @@ -2,7 +2,7 @@ Python implementation of the `DAG-CBOR codec `_ specification. """ -__version__ = "0.2.2" +__version__ = "0.2.3" from .encoding import encode from .decoding import decode diff --git a/dag_cbor/decoding.py b/dag_cbor/decoding/__init__.py similarity index 56% rename from dag_cbor/decoding.py rename to dag_cbor/decoding/__init__.py index 9ce17a2..4b31777 100644 --- a/dag_cbor/decoding.py +++ b/dag_cbor/decoding/__init__.py @@ -5,13 +5,16 @@ from io import BufferedIOBase, BytesIO import math import struct -from typing import Any, Dict, Callable, List, Optional, Tuple, Union +from typing import Any, Dict, Callable, List, Optional, Sequence, Tuple, Union +from typing_extensions import Literal from typing_validation import validate -from multiformats import multicodec, CID +from multiformats import multicodec, CID, varint -from .encoding import EncodableType, _dag_cbor_code -from .utils import CBORDecodingError, DAGCBORDecodingError +from ..encoding import EncodableType, _dag_cbor_code +from ..utils import CBORDecodingError, DAGCBORDecodingError +from . import _err as err +from ._stream import Stream DecodeCallback = Callable[[EncodableType, int], None] """ Type of optional callbacks for the :func:`decode` function.""" @@ -39,7 +42,7 @@ def decode(stream_or_bytes: Union[BufferedIOBase, bytes], *, 16 >>> stream = BytesIO(encoded_bytes) >>> bytes_read_cnt = BytesReadCounter() - >>> dag_cbor.decode(stream, allow_concat=True, callback=bytes_read_cnt) + >>> dag_cbor.decode(allow_concat=True, callback=bytes_read_cnt) {'a': 12, 'b': 'hello!'} >>> int(bytes_read_cnt) 13 @@ -64,10 +67,10 @@ def decode(stream_or_bytes: Union[BufferedIOBase, bytes], *, :raises ~dag_cbor.utils.CBORDecodingError: while reading the leading byte of a data item head, if no bytes are available :raises ~dag_cbor.utils.CBORDecodingError: while reading the argument bytes of a data item head, - if the expected number of argument bytes is not available + if the expected number of argument bytes is not available :raises ~dag_cbor.utils.CBORDecodingError: while decoding the data of a bytestring or string, if the expected number of data bytes is not available :raises ~dag_cbor.utils.CBORDecodingError: while decoding the items of a list or a map (keys and values), - if the expected number of items is not available + if the expected number of items is not available :raises ~dag_cbor.utils.CBORDecodingError: if an invalid utf-8 byte sequence is encountered while attempting to decode a string :raises ~dag_cbor.utils.DAGCBORDecodingError: if attempting to decode the special :obj:`float` values ``NaN``, ``Infinity`` and ``-Infinity`` :raises ~dag_cbor.utils.DAGCBORDecodingError: if the additional info is greater than 27, or different from 27 for major type 7 @@ -79,7 +82,7 @@ def decode(stream_or_bytes: Union[BufferedIOBase, bytes], *, :raises ~dag_cbor.utils.DAGCBORDecodingError: if non-bytestring data is found where CID data is expected (tag 42) :raises ~dag_cbor.utils.DAGCBORDecodingError: if a simple value (major type 7) different from 20 (False), 21 (True) or 22 (None) is encountered :raises ~dag_cbor.utils.DAGCBORDecodingError: if ``require_multicodec`` is set to :obj:`True` and - the bytes are not prefixed by the ``'dag-cbor'`` multicodec code + the bytes are not prefixed by the ``'dag-cbor'`` multicodec code :raises ~dag_cbor.utils.DAGCBORDecodingError: if ``allow_concat`` is set to :obj:`False` and the decoding did not use all available bytes """ @@ -88,69 +91,50 @@ def decode(stream_or_bytes: Union[BufferedIOBase, bytes], *, validate(require_multicodec, bool) # validate(callback, Optional[DecodeCallback]) # TODO: not yet supported by typing_validation if isinstance(stream_or_bytes, bytes): - stream: BufferedIOBase = BytesIO(stream_or_bytes) + _stream: BufferedIOBase = BytesIO(stream_or_bytes) else: - stream = stream_or_bytes + _stream = stream_or_bytes if require_multicodec: - code, _, stream = multicodec.unwrap_raw(stream) + code, _, _stream = multicodec.unwrap_raw(_stream) + stream = Stream(_stream, varint.encode(code)) if code != _dag_cbor_code: - raise DAGCBORDecodingError(f"Required 'dag-cbor' multicodec code {hex(_dag_cbor_code)}, unwrapped code {hex(code)} instead.") - data, _ = _decode_item(stream, callback=callback) - if allow_concat: - return data - remaining_bytes = stream.read() - if len(remaining_bytes) > 0: - raise DAGCBORDecodingError("Encode and decode must operate on a single top-level CBOR object") + raise DAGCBORDecodingError(err._required_multicodec(stream)) + else: + stream = Stream(_stream) + data, _ = _decode_item(stream, callback) + if not allow_concat: + remaining_bytes = stream.read() + if len(remaining_bytes) > 0: + raise DAGCBORDecodingError(err._multiple_top_level_items(stream)) return data -def _decode_item(stream: BufferedIOBase, *, - callback: Optional[DecodeCallback]) -> Tuple[EncodableType, int]: - # pylint: disable = too-many-return-statements, too-many-branches +def _decode_item(stream: Stream, callback: Optional[DecodeCallback]) -> Tuple[EncodableType, int]: major_type, arg, num_bytes_read = _decode_head(stream) ret: Optional[Tuple[EncodableType, int]] = None + assert 0x0 <= major_type <= 0x7, f"Major type must be one of 0x0-0x7, found 0x{major_type:x} instead." if isinstance(arg, float): - # float - assert major_type == 0x7 - if math.isnan(arg): - raise DAGCBORDecodingError("NaN is not an allowed float value.") - if math.isinf(arg): - if arg > 0: - raise DAGCBORDecodingError("Infinity is not an allowed float value.") - raise DAGCBORDecodingError("-Infinity is not an allowed float value.") + # Major type 0x7 (float case): + assert major_type == 0x7, f"Major type for float must be 0x7, found 0x{major_type:x} instead." + if math.isnan(arg) or math.isinf(arg): + raise DAGCBORDecodingError(err._invalid_float(stream, arg)) ret = (arg, num_bytes_read) - elif major_type == 0x0: - ret = (arg, num_bytes_read) # unsigned int - elif major_type == 0x1: - ret = (-1-arg, num_bytes_read) # negative int - elif major_type == 0x2: - value, num_bytes_further_read = _decode_bytes(stream, arg) - ret = (value, num_bytes_read+num_bytes_further_read) - elif major_type == 0x3: - value, num_bytes_further_read = _decode_str(stream, arg) - ret = (value, num_bytes_read+num_bytes_further_read) - elif major_type == 0x4: - value, _ = _decode_list(stream, arg, callback=callback) - ret = (value, num_bytes_read) - elif major_type == 0x5: - value, _ = _decode_dict(stream, arg, callback=callback) - ret = (value, num_bytes_read) - elif major_type == 0x6: - value, num_bytes_further_read = _decode_cid(stream, arg) - ret = (value, num_bytes_read+num_bytes_further_read) - elif major_type == 0x7: - value, _ = _decode_bool_none(stream, arg) - ret = (value, num_bytes_read) + elif major_type <= 0x1: + # Major types 0x0 and 0x1: + ret = (arg if major_type == 0x0 else -1-arg, num_bytes_read) else: - raise RuntimeError("Major type must be one of 0x0-0x7.") + # Major types 0x2-0x6 and 0x7 (bool/null case): + value, num_bytes_further_read = _decoders[major_type](stream, arg, callback) + ret = (value, num_bytes_read+num_bytes_further_read) if callback is not None: callback(*ret) return ret -def _decode_head(stream: BufferedIOBase) -> Tuple[int, Union[int, float], int]: +def _decode_head(stream: Stream) -> Tuple[int, Union[int, float], int]: + # pylint: disable = too-many-branches # read leading byte res = stream.read(1) if len(res) < 1: - raise CBORDecodingError("Unexpected EOF while reading leading byte of data item head.") + raise CBORDecodingError(err._unexpected_eof(stream, what="leading byte of data item head", n=1, include_prev_snapshot=False)) leading_byte = res[0] major_type = leading_byte >> 5 additional_info = leading_byte & 0b11111 @@ -159,11 +143,11 @@ def _decode_head(stream: BufferedIOBase) -> Tuple[int, Union[int, float], int]: # argument value = additional info return (major_type, additional_info, 1) if additional_info > 27 or (major_type == 0x7 and additional_info != 27): - raise DAGCBORDecodingError(f"Invalid additional info {additional_info} in data item head for major type {major_type}.") + raise DAGCBORDecodingError(err._invalid_additional_info(stream, additional_info, major_type)) argument_nbytes = 1<<(additional_info-24) res = stream.read(argument_nbytes) if len(res) < argument_nbytes: - raise CBORDecodingError(f"Unexpected EOF while reading {argument_nbytes} byte argument of data item head.") + raise CBORDecodingError(err._unexpected_eof(stream, what=f"{argument_nbytes} byte argument of data item head", n=argument_nbytes)) if additional_info == 24: # 1 byte of unsigned int argument value to follow return (major_type, res[0], 2) @@ -171,13 +155,15 @@ def _decode_head(stream: BufferedIOBase) -> Tuple[int, Union[int, float], int]: # 2 bytes of unsigned int argument value to follow arg = struct.unpack(">H", res)[0] if arg <= 255: - raise DAGCBORDecodingError(f"Integer {arg} was encoded using 2 bytes, while 1 byte would have been enough.") + raise DAGCBORDecodingError(err._excessive_int_size(stream, arg, 2, 1)) return (major_type, arg, 3) if additional_info == 26: # 4 bytes of unsigned int argument value to follow arg = struct.unpack(">L", res)[0] if arg <= 65535: - raise DAGCBORDecodingError(f"Integer {arg} was encoded using 4 bytes, while 2 bytes would have been enough.") + if arg <= 255: + raise DAGCBORDecodingError(err._excessive_int_size(stream, arg, 4, 1)) + raise DAGCBORDecodingError(err._excessive_int_size(stream, arg, 4, 2)) return (major_type, arg, 5) # necessarily additional_info == 27 if major_type == 0x7: @@ -186,102 +172,120 @@ def _decode_head(stream: BufferedIOBase) -> Tuple[int, Union[int, float], int]: # 8 bytes of unsigned int argument value to follow arg = struct.unpack(">Q", res)[0] if arg <= 4294967295: - raise DAGCBORDecodingError(f"Integer {arg} was encoded using 8 bytes, while 4 bytes would have been enough.") + if arg <= 255: + raise DAGCBORDecodingError(err._excessive_int_size(stream, arg, 8, 1)) + if arg <= 65535: + raise DAGCBORDecodingError(err._excessive_int_size(stream, arg, 8, 2)) + raise DAGCBORDecodingError(err._excessive_int_size(stream, arg, 8, 4)) return (major_type, arg, 9) -def _decode_bytes(stream: BufferedIOBase, length: int) -> Tuple[bytes, int]: +def _decode_bytes(stream: Stream, length: int, callback: Optional[DecodeCallback]) -> Tuple[bytes, int]: res = stream.read(length) if len(res) < length: - raise CBORDecodingError(f"Unexpected EOF while reading {length} bytes of bytestring.") + raise CBORDecodingError(err._unexpected_eof(stream, what=f"{length} bytes of bytestring", n=length)) return (res, length) -def _decode_str(stream: BufferedIOBase, length: int) -> Tuple[str, int]: +def _decode_str(stream: Stream, length: int, callback: Optional[DecodeCallback]) -> Tuple[str, int]: res = stream.read(length) if len(res) < length: - raise CBORDecodingError(f"Unexpected EOF while reading {length} bytes of string.") + raise CBORDecodingError(err._unexpected_eof(stream, what=f"{length} bytes of string", n=length)) try: s = res.decode(encoding="utf-8", errors="strict") - except UnicodeError as e: - raise CBORDecodingError("Strings must be valid utf-8 strings.") from e + except UnicodeDecodeError as e: + raise CBORDecodingError(err._unicode(stream, length, e.start, e.end, e.reason)) # pylint: disable = raise-missing-from return (s, length) -def _decode_list(stream: BufferedIOBase, length: int, *, - callback: Optional[DecodeCallback]) -> Tuple[List[Any], int]: +def _decode_list(stream: Stream, length: int, callback: Optional[DecodeCallback]) -> Tuple[List[Any], int]: + list_head_snapshot = stream.curr_snapshot l: List[Any] = [] - for i in range(length): + for idx in range(length): try: - item, _ = _decode_item(stream, callback=callback) + item, _ = _decode_item(stream, callback) l.append(item) except CBORDecodingError as e: - raise CBORDecodingError(f"Error while decoding item #{i} in list of length {length}.") from e + raise CBORDecodingError(err._list_item(list_head_snapshot, idx, length, e)) # pylint: disable = raise-missing-from return (l, 0) -def _decode_dict_key(stream: BufferedIOBase, key_idx: int, dict_length: int, *, - callback: Optional[DecodeCallback]) -> Tuple[str, int, bytes]: +def _decode_dict_key(stream: Stream, key_idx: int, dict_length: int, callback: Optional[DecodeCallback]) -> Tuple[str, int, bytes]: # pylint: disable = too-many-return-statements, too-many-branches major_type, arg, num_bytes_read = _decode_head(stream) ret: Optional[Tuple[EncodableType, int]] = None if major_type != 0x3: - raise DAGCBORDecodingError(f"Key #{key_idx} in dict of length {dict_length} is of major type {hex(major_type)}, expected 0x3 (string).") + raise DAGCBORDecodingError(err._dict_key_type(stream, major_type)) assert not isinstance(arg, float) str_length = arg str_bytes: bytes = stream.read(str_length) if len(str_bytes) < str_length: - raise CBORDecodingError(f"Unexpected EOF while reading {str_length} bytes of string.") + raise CBORDecodingError(err._unexpected_eof(stream, f"{str_length} bytes of string", str_length)) try: s = str_bytes.decode(encoding="utf-8", errors="strict") - except UnicodeError as e: - raise CBORDecodingError("Strings must be valid utf-8 strings.") from e + except UnicodeDecodeError as e: + raise CBORDecodingError(err._unicode(stream, str_length, e.start, e.end, e.reason)) # pylint: disable = raise-missing-from ret = (s, num_bytes_read+str_length) if callback is not None: callback(*ret) return ret+(str_bytes,) -def _decode_dict(stream: BufferedIOBase, length: int, - callback: Optional[DecodeCallback]) -> Tuple[Dict[str, Any], int]: +def _decode_dict(stream: Stream, length: int, callback: Optional[DecodeCallback]) -> Tuple[Dict[str, Any], int]: # pylint: disable = too-many-locals + dict_head_snapshot = stream.curr_snapshot d: Dict[str, Any] = {} key_bytes_list: List[bytes] = [] for i in range(length): try: - k, _, k_bytes = _decode_dict_key(stream, i, length, callback=callback) + k, _, k_bytes = _decode_dict_key(stream, i, length, callback) except CBORDecodingError as e: - raise CBORDecodingError(f"Error while decoding key #{i} in dict of length {length}.") from e + raise CBORDecodingError(err._dict_item(dict_head_snapshot, "key", i, length, e)) # pylint: disable = raise-missing-from + if k in d: + raise DAGCBORDecodingError(err._duplicate_dict_key(dict_head_snapshot, stream, k, i, length)) try: - v, _ = _decode_item(stream, callback=callback) + v, _ = _decode_item(stream, callback) except CBORDecodingError as e: - raise CBORDecodingError(f"Error while decoding value #{i} in dict of length {length}.") from e + raise CBORDecodingError(err._dict_item(dict_head_snapshot, "value", i, length, e)) # pylint: disable = raise-missing-from d[k] = v key_bytes_list.append(k_bytes) - if len(d) != length: - raise DAGCBORDecodingError(f"Found only {len(d)} unique keys out of {length} key-value pairs.") # check that keys are sorted canonically assert len(key_bytes_list) == length sorted_key_bytes_list = sorted(key_bytes_list, key=lambda e: (len(e), e)) - for idx, (k1, k2) in enumerate(zip(key_bytes_list, sorted_key_bytes_list)): - if k1 != k2: - exp_idx = sorted_key_bytes_list.index(k1) - raise DAGCBORDecodingError(f"Dictionary keys not in canonical order: key #{idx} should have been in position #{exp_idx} instead.") + for idx0, (kb0, kb1) in enumerate(zip(key_bytes_list, sorted_key_bytes_list)): + if kb0 != kb1: + idx1 = key_bytes_list.index(kb1) + raise DAGCBORDecodingError(err._dict_key_order(dict_head_snapshot, kb0, idx0, kb1, idx1, length)) return (d, 0) -def _decode_cid(stream: BufferedIOBase, arg: int) -> Tuple[CID, int]: +def _decode_cid(stream: Stream, arg: int, callback: Optional[DecodeCallback]) -> Tuple[CID, int]: if arg != 42: - raise DAGCBORDecodingError(f"Error while decoding major type 0x6: tag {arg} is not allowed.") + raise DAGCBORDecodingError(err._invalid_tag(stream, arg)) + cid_head_snapshots = stream.prev_snapshot, stream.curr_snapshot try: cid_bytes, num_bytes_read = _decode_item(stream, callback=None) except CBORDecodingError as e: - raise CBORDecodingError("Error while decoding CID bytes.") from e + raise CBORDecodingError(err._cid(cid_head_snapshots, e)) # pylint: disable = raise-missing-from if not isinstance(cid_bytes, bytes): - raise DAGCBORDecodingError(f"Expected CID bytes, found data of type {type(cid_bytes)} instead.") + raise DAGCBORDecodingError(err._cid_bytes(cid_head_snapshots, stream, cid_bytes)) if not cid_bytes[0] == 0: - raise DAGCBORDecodingError(f"CID does not start with the identity Multibase prefix (0x00).") + raise DAGCBORDecodingError(err._cid_multibase(cid_head_snapshots, stream, cid_bytes)) return (CID.decode(cid_bytes[1:]), num_bytes_read) -def _decode_bool_none(stream: BufferedIOBase, arg: int) -> Tuple[Optional[bool], int]: +def _decode_bool_none(stream: Stream, arg: int, callback: Optional[DecodeCallback]) -> Tuple[Optional[bool], int]: if arg == 20: return (False, 0) if arg == 21: return (True, 0) if arg == 22: return (None, 0) - raise DAGCBORDecodingError(f"Error while decoding major type 0x7: simple value {arg} is not allowed.") + raise DAGCBORDecodingError(err._simple_value(stream, arg)) + +def _decode_dummy(stream: Stream, arg: int, callback: Optional[DecodeCallback]) -> Tuple[None, int]: + assert False, f"Major type {arg} does not have an associated decoder." + +_decoders: tuple[Callable[[Stream, int, Optional[DecodeCallback]], tuple[EncodableType, int]], ...] = ( + _decode_dummy, + _decode_dummy, + _decode_bytes, + _decode_str, + _decode_list, + _decode_dict, + _decode_cid, + _decode_bool_none +) diff --git a/dag_cbor/decoding/_err.py b/dag_cbor/decoding/_err.py new file mode 100644 index 0000000..15cae03 --- /dev/null +++ b/dag_cbor/decoding/_err.py @@ -0,0 +1,156 @@ +r""" + Messages for DAG-CBOR decoding errors. +""" + +import math +from typing_extensions import Literal + +from multiformats import varint + +from ..encoding import EncodableType, _dag_cbor_code +from ..utils import CBORDecodingError +from ._stream import Stream, StreamSnapshot +from ._err_utils import _TRUNC_BYTES, _bytes2hex, _decode_error_lines, _decode_error_msg, _extract_error_cause_lines, _cid_error_template + +def _required_multicodec(stream: Stream) -> str: + curr_snapshot = stream.curr_snapshot + msg = "Required 'dag-cbor' multicodec code." + exp_bs = varint.encode(_dag_cbor_code) + details = f"byte{'s' if curr_snapshot.latest_read_size > 1 else ''} should be 0x{exp_bs.hex()}." + return _decode_error_msg(msg, curr_snapshot, details=details) + +def _multiple_top_level_items(stream: Stream) -> str: + msg = "Encode and decode must operate on a single top-level CBOR object." + details = "unexpected start byte of a second top-level CBOR object" + return _decode_error_msg(msg, stream.curr_snapshot, details=details) + +def _invalid_float(stream: Stream, arg: float) -> str: + if math.isnan(arg): + msg = "NaN is not an allowed float value." + float_str = "float('NaN')" + else: + assert math.isinf(arg), "Float must be NaN or infinite." + s = ("" if arg > 0 else "-") + msg = s+"Infinity is not an allowed float value." + float_str = f"float('{s}Infinity')" + details = f"struct.pack('>d', {float_str})" + return _decode_error_msg(msg, stream.curr_snapshot, details=details, hl_start=1) + +def _unexpected_eof(stream: Stream, what: str, n: int, include_prev_snapshot: bool = True) -> str: + prev_snapshot = stream.prev_snapshot if include_prev_snapshot else StreamSnapshot(bytes(), 0) + curr_snapshot = stream.curr_snapshot + msg = f"Unexpected EOF while attempting to read {what}." + bytes_read = curr_snapshot.latest_read_size + hl_start = prev_snapshot.latest_read_size + details = f"{bytes_read} bytes read, out of {n} expected." + snapshots = [prev_snapshot, curr_snapshot] if include_prev_snapshot else [curr_snapshot] + return _decode_error_msg(msg, *snapshots, details=details, eof=True, hl_start=hl_start) + +def _invalid_additional_info(stream: Stream, additional_info: int, major_type: int) -> str: + msg = f"Invalid additional info {additional_info} in data item head for major type 0x{major_type:x}." + if major_type == 0x7: + details = f"lower 5 bits are {additional_info:0>5b}, expected from {0:0>5b} to {23:0>5b}, or {27:0>5b}." + else: + details = f"lower 5 bits are {additional_info:0>5b}, expected from {0:0>5b} to {27:0>5b}." + return _decode_error_msg(msg, stream.curr_snapshot, details=details) + +def _excessive_int_size(stream: Stream, arg: int, bytes_used: int, bytes_sufficient: int) -> str: + s = 's' if bytes_sufficient > 1 else '' + msg = f"Integer {arg} was encoded using {bytes_used} bytes, while {bytes_sufficient} byte{s} would have been enough." + details = f"same as byte{s} 0x{arg:0>{2*bytes_sufficient}x}" + return _decode_error_msg(msg, stream.prev_snapshot, stream.curr_snapshot, details=details, hl_start=1) + +def _unicode(stream: Stream, length: int, start: int, end: int, reason: str) -> str: + prev_snapshot = stream.prev_snapshot + curr_snapshot = stream.curr_snapshot + msg = "String bytes are not valid utf-8 bytes." + lines = [msg] + n = curr_snapshot.latest_read_size + ps = 0 + pe = 0 + if n <= _TRUNC_BYTES: + ps = start + pe = n-end + str_details = f"string of length {length}" + lines.extend(_decode_error_lines(prev_snapshot, curr_snapshot, details=str_details, hl_len=1)) + lines.extend(_decode_error_lines(curr_snapshot, details=reason, start=start, end=end, pad_start=ps+prev_snapshot.latest_read_size, pad_end=pe)) + return "\n".join(lines) + +def _list_item(list_head_snapshot: StreamSnapshot, idx: int, length: int, e: CBORDecodingError) -> str: + lines = [ + "Error while decoding list.", + *_decode_error_lines(list_head_snapshot, details=f"list of length {length}", dots=True), + f"Error occurred while decoding item at position {idx}: further details below.", + *_extract_error_cause_lines(e) + ] + return "\n".join(lines) + +def _dict_key_type(stream: Stream, major_type: int) -> str: + msg = "Dictionary key is not of string type." + details = f"major type is {hex(major_type)}, should be 0x3 (string) instead." + return _decode_error_msg(msg, stream.curr_snapshot, details=details, hl_len=1, dots=True) + +def _dict_item(dict_head_snapshot: StreamSnapshot, item: Literal["key", "value"], idx: int, length: int, e: CBORDecodingError) -> str: + lines = [ + "Error while decoding dict.", + *_decode_error_lines(dict_head_snapshot, details=f"dict of length {length}", dots=True), + f"Error occurred while decoding {item} at position {idx}: further details below.", + *_extract_error_cause_lines(e) + ] + return "\n".join(lines) + +def _duplicate_dict_key(dict_head_snapshot: StreamSnapshot, stream: Stream, k: str, idx: int, length: int) -> str: + lines = [ + "Error while decoding dict.", + *_decode_error_lines(dict_head_snapshot, details=f"dict of length {length}", dots=True), + f"Duplicate key is found at position {idx}.", + *_decode_error_lines(stream.curr_snapshot, details=f"decodes to key {repr(k)}") + ] + return "\n".join(lines) + +def _dict_key_order(dict_head_snapshot: StreamSnapshot, kb0: bytes, idx0: int, kb1: bytes, idx1: int, length: int) -> str: + # pylint: disable = too-many-arguments + pad_len = max(len(str(idx0)), len(str(idx1))) + idx0_str = f"{idx0: >{pad_len}}" + idx1_str = f"{idx1: >{pad_len}}" + lines = [ + "Error while decoding dict.", + *_decode_error_lines(dict_head_snapshot, details=f"dict of length {length}", dots=True), + "Dictionary keys not in canonical order.", + f" Key at pos #{idx0_str}: {_bytes2hex(kb0)}", + f" Key at pos #{idx1_str}: {_bytes2hex(kb1)}", + ] + return "\n".join(lines) + +def _invalid_tag(stream: Stream, arg: int) -> str: + prev_snapshot = stream.prev_snapshot + curr_snapshot = stream.curr_snapshot + msg = "Error while decoding item of major type 0x6: only tag 42 is allowed." + details = f"tag {arg}" + hl_start = prev_snapshot.latest_read_size + return _decode_error_msg(msg, prev_snapshot, curr_snapshot, details=details, hl_start=hl_start) + +def _cid(cid_head_snapshots: tuple[StreamSnapshot, StreamSnapshot], e: CBORDecodingError) -> str: + return _cid_error_template(cid_head_snapshots, *_extract_error_cause_lines(e)) + +def _cid_bytes(cid_head_snapshots: tuple[StreamSnapshot, StreamSnapshot], stream: Stream, cid_bytes: EncodableType) -> str: + decoded_type = type(cid_bytes).__name__ + decoded_type_details = f"decodes to an item of type {repr(decoded_type)}" + explanation = [ + "CID bytes did not decode to an item of type 'bytes'.", + *_decode_error_lines(stream.curr_snapshot, details=decoded_type_details), + ] + return _cid_error_template(cid_head_snapshots, *explanation) + +def _cid_multibase(cid_head_snapshots: tuple[StreamSnapshot, StreamSnapshot], stream: Stream, cid_bytes: bytes) -> str: + error_details = "byte should be 0x00" + explanation = [ + "CID does not start with the identity Multibase prefix.", + *_decode_error_lines(stream.prev_snapshot, stream.curr_snapshot, details=error_details, hl_start=1, hl_len=1), + ] + return _cid_error_template(cid_head_snapshots, *explanation) + +def _simple_value(stream: Stream, arg: int) -> str: + msg = "Error while decoding major type 0x7: allowed simple values are 0x14, 0x15 and 0x16." + details = f"simple value is {arg}" + return _decode_error_msg(msg, stream.curr_snapshot, details=details) diff --git a/dag_cbor/decoding/_err_utils.py b/dag_cbor/decoding/_err_utils.py new file mode 100644 index 0000000..522cbd8 --- /dev/null +++ b/dag_cbor/decoding/_err_utils.py @@ -0,0 +1,99 @@ +r""" + Utility functions used to produce messages for DAG-CBOR decoding errors. +""" + +from typing import List, Optional + +from ..utils import CBORDecodingError +from ._stream import StreamSnapshot + +_TRUNC_BYTES = 16 + +def _bytes2hex(bs: bytes) -> str: + if len(bs) <= _TRUNC_BYTES: + return bs.hex() + return bs[:1].hex()+"..."+bs[-1:].hex() # fixed length 7 < 2*_TRUNC_BYTES + +def _decode_error_lines(*snapshots: StreamSnapshot, details: Optional[str] = None, + eof: bool = False, + start: Optional[int] = None, + end: Optional[int] = None, + pad_start: int = 0, + pad_end: int = 0, + hl_start: int = 0, + hl_len: Optional[int] = None, + dots: bool = False, + ) -> List[str]: + # pylint: disable = too-many-locals + assert snapshots + bs = bytes() + pos = snapshots[0].latest_read_start + for snapshot in snapshots: + bs += snapshot.latest_read + if start is None: + start = 0 + if end is None: + end = len(bs) + assert 0 <= start <= end <= len(bs) + assert pad_start >= 0 + assert pad_end >= 0 + assert hl_start >= 0 + bs = bs[start:end] + pos += start + pos_str = str(pos) + pos_tab = " "*len(pos_str) + bs_str = _bytes2hex(bs) + truncated = len(bs_str) != 2*len(bs) + if not bs_str: + bs_str = "" + bs_tab = "^"*len(bs_str) + else: + if hl_len is None: + hl_len = len(bs)-hl_start + else: + assert 0 <= hl_len <= len(bs)-start + if truncated and not (hl_len == 1 and (hl_start in {0, len(bs)-1})): + bs_tab = "^"*len(bs_str) + else: + bs_tab = " "*hl_start+"^^"*hl_len + bs_str = " "*pad_start+bs_str+" "*pad_end + bs_tab = " "*pad_start+bs_tab + bytes_line = f"At byte #{pos_str}: {bs_str}" + if truncated: + last_byte_idx = pos+len(bs)-1 + bytes_line += f" (last byte #{last_byte_idx})" + if dots: + bytes_line += "..." + descr_line = f" {pos_tab} {bs_tab} {details}" + lines = [bytes_line] + if details is not None: + lines.append(descr_line) + return lines + +def _decode_error_msg(msg: str, *snapshots: StreamSnapshot, details: Optional[str] = None, + eof: bool = False, + start: Optional[int] = None, + end: Optional[int] = None, + hl_start: int = 0, + hl_len: Optional[int] = None, + dots: bool = False, + ) -> str: + lines = [msg] + lines.extend(_decode_error_lines(*snapshots, details=details, eof=eof, + start=start, end=end, hl_start=hl_start, hl_len=hl_len, + dots=dots)) + return "\n".join(lines) + + +def _extract_error_cause_lines(e: CBORDecodingError) -> List[str]: + lines = str(e).split("\n") + return [(r"\ " if idx == 0 else " ")+line for idx, line in enumerate(lines)] + + +def _cid_error_template(cid_head_snapshots: tuple[StreamSnapshot, StreamSnapshot], *explanation: str) -> str: + lines = [ + "Error while decoding CID.", + *_decode_error_lines(*cid_head_snapshots, details="CID tag", dots=True), + *explanation + ] + return "\n".join(lines) diff --git a/dag_cbor/decoding/_stream.py b/dag_cbor/decoding/_stream.py new file mode 100644 index 0000000..57f70d7 --- /dev/null +++ b/dag_cbor/decoding/_stream.py @@ -0,0 +1,86 @@ +r""" + Byte-streams and snapshots used in DAG-CBOR decoding, keeping track of latest and previous read byte chunks for error reporting purposes. +""" +from io import BufferedIOBase, BytesIO +from typing import Optional + +class StreamSnapshot: + r""" A snapshot of the current state of a stream. """ + + _bs: bytes + _pos: int + + def __new__(cls, latest_read: bytes, next_read_start: int) -> "StreamSnapshot": + instance = object.__new__(cls) + instance._bs = latest_read + instance._pos = next_read_start + return instance + + @property + def latest_read(self) -> bytes: + r""" The latest byte chunk read from the stream. """ + return self._bs + + @property + def latest_read_size(self) -> int: + r""" Size of the latest byte chunk read from the stream. """ + return len(self._bs) + + @property + def latest_read_start(self) -> int: + r""" Start position in the stream for the latest byte chunk read. """ + return self._pos-len(self._bs) + + @property + def num_bytes_read(self) -> int: + r""" Total number of bytes read so far in the stream. """ + return self._pos + +class Stream: + r""" + Container for the byte-stream being decoded, offering additional book-keeping functionality used to produce detailed error messages. + """ + + _buf: BufferedIOBase + _bs: bytes + _pos: int + _prev_bs: bytes + _prev_pos: int + + def __new__(cls, buffer: Optional[BufferedIOBase] = None, init_bytes_read: bytes = bytes()) -> "Stream": + if buffer is None: + buffer = BytesIO(bytes()) + instance = object.__new__(cls) + instance._buf = buffer + instance._bs = init_bytes_read + instance._pos = len(init_bytes_read) + instance._prev_bs = bytes() + instance._prev_pos = 0 + return instance + + @property + def curr_snapshot(self) -> "StreamSnapshot": + r""" A snapshot of the current state of the stream. """ + return StreamSnapshot(self._bs, self._pos) + + @property + def prev_snapshot(self) -> "StreamSnapshot": + r""" A snapshot of the state of the stream immediately before the latest non-extending read. """ + return StreamSnapshot(self._prev_bs, self._prev_pos) + + def read(self, num_bytes: Optional[int] = None, *, extend: bool = False) -> bytes: + r""" + Read the given number of bytes from the stream. If :obj:`None`, reads all remaining bytes. + If ``extend`` is set to :obj:`True`, the current stream snapshot (see :attr:`Stream.curr_snapshot`) is extended with the bytes just read, + and the previous stream snapshot (see :attr:`Stream.prev_snapshot`) is kept. + """ + bs = self._buf.read(num_bytes) + if extend: + self._bs += bs + self._pos += len(bs) + else: + self._prev_bs = self._bs + self._prev_pos = self._pos + self._bs = bs + self._pos += len(bs) + return bs diff --git a/dag_cbor/encoding.py b/dag_cbor/encoding.py index 2d9775c..9cf8675 100644 --- a/dag_cbor/encoding.py +++ b/dag_cbor/encoding.py @@ -171,11 +171,6 @@ def _encode_bytes(stream: BufferedIOBase, value: bytes) -> int: return num_head_bytes+len(value) def _encode_str(stream: BufferedIOBase, value: str) -> int: - # try: - # utf8_value: bytes = value.encode("utf-8", errors="strict") - # except UnicodeError as e: - # raise CBOREncodingError("Strings must be valid utf-8 strings.") from e - # # as far as I understand, the above should never raise UnicodeError on "utf-8" encoding utf8_value: bytes = value.encode("utf-8", errors="strict") num_head_bytes = _encode_head(stream, 0x3, len(utf8_value)) stream.write(utf8_value) @@ -189,12 +184,6 @@ def _encode_list(stream: BufferedIOBase, value: List[Any]) -> int: def _encode_dict(stream: BufferedIOBase, value: Dict[str, Any]) -> int: _check_key_compliance(value) - # try: - # utf8key_val_pairs = [(k.encode("utf-8", errors="strict"), v) - # for k, v in value.items()] - # except UnicodeError as e: - # raise CBOREncodingError("Strings must be valid utf-8 strings.") from e - # # as far as I understand, the above should never raise UnicodeError on "utf-8" encoding utf8key_val_pairs = [(k.encode("utf-8", errors="strict"), v) for k, v in value.items()] # 1. sort keys canonically: diff --git a/dag_cbor/random.py b/dag_cbor/random.py index fe31159..9480359 100644 --- a/dag_cbor/random.py +++ b/dag_cbor/random.py @@ -252,7 +252,7 @@ def set_options(*, def rand_data(n: Optional[int] = None, *, max_nesting: Optional[int] = None) -> Iterator[EncodableType]: r""" - Generates a stream of random data data. + Generates a stream of random data. :param n: the number of samples to be yielded; if :obj:`None`, an infinite stream is yielded :type n: :obj:`int` or :obj:`None`, *optional* diff --git a/dag_cbor/utils.py b/dag_cbor/utils.py index 1641d8f..1d14528 100644 --- a/dag_cbor/utils.py +++ b/dag_cbor/utils.py @@ -53,11 +53,6 @@ class DAGCBORDecodingError(CBORDecodingError, DAGCBORError): ... def _canonical_order_dict(value: Dict[str, Any]) -> Dict[str, Any]: - # try: - # utf8key_key_val_pairs = [(k.encode("utf-8", errors="strict"), k, v) for k, v in value.items()] - # except UnicodeError as e: - # raise CBOREncodingError("Strings must be valid utf-8 strings.") from e - # # as far as I understand, the above should never raise UnicodeError on "utf-8" encoding utf8key_key_val_pairs = [(k.encode("utf-8", errors="strict"), k, v) for k, v in value.items()] sorted_utf8key_key_val_pairs = sorted(utf8key_key_val_pairs, key=lambda i: (len(i[0]), i[0])) return {k: v for _, k, v in sorted_utf8key_key_val_pairs} @@ -67,9 +62,6 @@ def _check_key_compliance(value: Dict[str, Any]) -> None: """ Check keys for DAG-CBOR compliance. """ if not all(isinstance(k, str) for k in value.keys()): raise DAGCBOREncodingError("Keys for maps must be strings.") - # if len(value.keys()) != len(set(value.keys())): - # raise CBOREncodingError("Keys for maps must be unique.") - # # as far as I understand, the above should never happen for dictionary keys def check_key_compliance(value: Dict[str, Any]) -> None: diff --git a/docs/getting-started.rst b/docs/getting-started.rst index e244859..829c3e9 100644 --- a/docs/getting-started.rst +++ b/docs/getting-started.rst @@ -30,6 +30,15 @@ b'\xa2aa\x0cabfhello!' The :mod:`~dag_cbor.random` module contains functions to generate random data compatible with DAG-CBOR encoding. The :mod:`~dag_cbor.utils` module contains errors and utility functions. +Please note that :mod:`dag_cbor` internally imports `multiformats `_: if you'd like to initialise multiformats +with a custom selection of multicodecs/multihashes, you should call ``multiformats_config.enable()`` **before** you import :mod:`dag_cbor` (see the `multiformats docs `_ for further details): + +.. code-block:: python + + import multiformats_config + multiformats_config.enable(codecs=["sha1", 0x29], bases=["base64url", "9"]) + import dag_cbor # internally imports multiformats + The DAG-CBOR codec ------------------ diff --git a/docs/make-api.json b/docs/make-api.json index 8366d0f..c52cb88 100644 --- a/docs/make-api.json +++ b/docs/make-api.json @@ -9,5 +9,9 @@ }, "exclude_members": {}, "include_modules": [], - "exclude_modules": [] + "exclude_modules": [ + "dag_cbor.decoding._err", + "dag_cbor.decoding._err_utils", + "dag_cbor.decoding._stream" + ] } \ No newline at end of file diff --git a/docs/make-api.py b/docs/make-api.py index ef63898..df831f4 100644 --- a/docs/make-api.py +++ b/docs/make-api.py @@ -172,6 +172,8 @@ def make_apidocs() -> None: ] print(f"Writing TOC for API docfiles at {toc_filename}") for mod_name in modules_dict: + if mod_name in exclude_modules: + continue line = f" {apidocs_folder}/{mod_name}" toctable_lines.append(line) print(line) diff --git a/setup.cfg b/setup.cfg index 8ccad0b..66ed3ce 100644 --- a/setup.cfg +++ b/setup.cfg @@ -26,6 +26,7 @@ classifiers = packages = find: python_requires = >=3.7 install_requires = + typing-extensions typing-validation multiformats diff --git a/test/test_01_encode_decode_eq_original.py b/test/test_01_encode_decode_eq_original.py index 0a0cf53..f565946 100644 --- a/test/test_01_encode_decode_eq_original.py +++ b/test/test_01_encode_decode_eq_original.py @@ -92,7 +92,7 @@ def test_list() -> None: assert x == decode(encode(x, include_multicodec=True), require_multicodec=True), error_msg @pytest.mark.parametrize("canonical", [True, False]) -def test_dict(canonical) -> None: +def test_dict(canonical: bool) -> None: """ Encodes random `dict` samples with `dag_cbor.encoding.encode`, encodes them with `cbor2.encoder.dumps` and checks that the two encodings match. diff --git a/test_error_messages.py b/test_error_messages.py new file mode 100644 index 0000000..6199b6d --- /dev/null +++ b/test_error_messages.py @@ -0,0 +1,131 @@ +r""" + Prints error messages for a variety of decoding failures, to check that the new detailed error messages look all right. +""" +# pylint: disable = all + +from typing import List +from multiformats import varint +from dag_cbor.random import rand_data +from dag_cbor import encode, decode +from dag_cbor.encoding import EncodableType +from dag_cbor.utils import CBOREncodingError, CBORDecodingError + +import random + +random.seed(0) + +test_cases = [ + # err._required_multicodec + "00", + "81e20301", + # err._multiple_top_level_items + "718301020301", + # err._invalid_float + "71fb7ff8000000000000", + "71fb7ff0000000000000", + "71fbfff0000000000000", + # err._unexpected_eof + "71", + "71830102", + "71fb3fb99999", + "7146"+"7891bc", + "7166"+b"hello".hex(), + "71a1"+("66"+b"hello".hex()), + # err._invalid_additional_info + "715c", + "71f9", + # err._excessive_int_size + "7119"+f"{156:0>4x}", + "711a"+f"{156:0>8x}", + "711a"+f"{32033:0>8x}", + "711b"+f"{156:0>16x}", + "711b"+f"{32033:0>16x}", + "711b"+f"{2305067290:0>16x}", + # err._unicode + "7161"+b"\xe9".hex(), + "7162"+b"\xe9\x80".hex(), + "7162"+b"A\xe9".hex(), + "7163"+b"AB\xe9".hex(), + "7162"+b"\xe9Z".hex(), + "7163"+b"\xe9YZ".hex(), + "7164"+b"A\xe9YZ".hex(), + "7165"+b"AB\xe9YZ".hex(), + "7165"+b"AB\xe9\x80YZ".hex(), + "7169"+b"ABCD\xe9\x80WXYZ".hex(), + "7171"+b"ABCDEFGHIJKLMNO\xe9\x80".hex(), + "71a1"+("63"+b"A\xe9Z".hex())+"01", + # err._list_item + "718401"+("1a"+f"{32033:0>8x}")+"0304", + "718401"+("65"+b"A\xe9YZ".hex())+"0304", + # err._dict_key_type + "71a10101", + "71a18301020301", + # err._dict_item for a value + "71a2"+("65"+b"hello".hex())+"01"+("63"+b"bye".hex())+"fb7ff0000000000000", + # err._duplicate_dict_key for a value + "71a3"+("65"+b"hello".hex())+"01"+("63"+b"bye".hex())+"02"+("65"+b"hello".hex())+"03", + # err._dict_key_order + "71a3"+("65"+b"hello".hex())+"01"+("66"+b"whatup".hex())+"02"+("63"+b"bye".hex())+"03", + # err._invalid_tag + "71d829"+"46"+"7891bc", + # err._cid + "71d82a"+"46"+"7891bc", + # err._cid_bytes + "71d82a"+"65"+b"hello".hex(), + # err._cid_multibase + "71d82a"+"450101030405", + # err._simple_value + "71f3" +] + +def create_embedding_obj(tag: str) -> EncodableType: + for obj in rand_data(max_nesting=4): + if not isinstance(obj, dict): + continue + if len(obj) < 4: + continue + list_values = [v for v in obj.values() if isinstance(v, list) and len(v) > 4] + if not list_values: + continue + l = random.choice(list_values) + l[random.randrange(0, len(l))] = tag + return obj + return tag + +def deep_embed(test_case: str) -> str: + tag = "0xdeadbeef" + obj = create_embedding_obj(tag) + obj_bytes = encode(obj).hex() + tag_bytes = encode(tag).hex() + return "71"+obj_bytes.replace(tag_bytes, test_case[2:]) + +deep_test_cases = [ + deep_embed(random.choice(test_cases)) + for _ in range(10) +] +def print_decode_error(test_case: str) -> bool: + encoded_bytes = bytes.fromhex(test_case) + encoded_bytes_str = encoded_bytes.hex() if encoded_bytes else "" + print(f"> Error raised by decoding test case {idx: >2}:\n{encoded_bytes_str}") + print() + try: + decode(encoded_bytes, require_multicodec=True) + except CBORDecodingError as e: + print(e) + cause = e.__cause__ + while cause is not None: + print(cause) + cause = cause.__cause__ + print() + return True + return False + +if __name__ == "__main__": + print("==== Shallow test cases ====") + print() + for idx, test_case in enumerate(test_cases): + assert print_decode_error(test_case), f"Decoding of test case {idx} should have raised error." + print("==== Deep test cases ====") + print() + for idx, test_case in enumerate(deep_test_cases): + assert print_decode_error(test_case), f"Decoding of deep test case {idx} should have raised error." \ No newline at end of file