Skip to content

Commit

Permalink
v0.3.1
Browse files Browse the repository at this point in the history
Gave more expressive names to `dag_cbor.ipld` type aliases, removed `ipld` import from `dag_cbor`.
  • Loading branch information
sg495 committed Feb 26, 2023
1 parent 8671307 commit 9f86297
Show file tree
Hide file tree
Showing 11 changed files with 116 additions and 117 deletions.
7 changes: 3 additions & 4 deletions dag_cbor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@

from __future__ import annotations # See https://peps.python.org/pep-0563/

__version__ = "0.3.0"
__version__ = "0.3.1"

from . import ipld
from .ipld import Kind, Path
from .ipld import IPLDKind, IPLDScalarKind, ObjPath
from .encoding import encode
from .decoding import decode

# explicit re-exports
__all__ = ["encode", "decode", "ipld", "Kind", "Path"]
__all__ = ["encode", "decode", "IPLDKind", "IPLDScalarKind", "ObjPath"]
14 changes: 7 additions & 7 deletions dag_cbor/decoding/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from multiformats import multicodec, CID, varint

from ..ipld import Kind
from ..ipld import IPLDKind
from ..encoding import _dag_cbor_code
from .err import CBORDecodingError, DAGCBORDecodingError
from . import _err
Expand All @@ -27,7 +27,7 @@ class DecodeCallback(Protocol):
r"""
Type of optional callbacks for the :func:`decode` function.
"""
def __call__(self, value: Kind, num_bytes_read: int) -> None:
def __call__(self, value: IPLDKind, num_bytes_read: int) -> None:
...

class _DecodeOptions(TypedDict, total=False):
Expand All @@ -43,7 +43,7 @@ def decode(stream_or_bytes: Union[BufferedIOBase, bytes], *,
allow_concat: bool = False,
callback: Optional["DecodeCallback"] = None,
require_multicodec: bool = False,
normalize_strings: Literal["NFC", "NFKC", "NFD", "NFKD", None] = None) -> Kind:
normalize_strings: Literal["NFC", "NFKC", "NFD", "NFKD", None] = None) -> IPLDKind:
r"""
Decodes and returns a single data item from the given ``stream_or_bytes``, with the DAG-CBOR codec.
Expand Down Expand Up @@ -131,9 +131,9 @@ def decode(stream_or_bytes: Union[BufferedIOBase, bytes], *,
raise DAGCBORDecodingError(_err._multiple_top_level_items(stream))
return data

def _decode_item(stream: Stream, options: _DecodeOptions) -> Tuple[Kind, int]:
def _decode_item(stream: Stream, options: _DecodeOptions) -> Tuple[IPLDKind, int]:
major_type, arg, num_bytes_read = _decode_head(stream)
ret: Optional[Tuple[Kind, int]] = None
ret: Optional[Tuple[IPLDKind, int]] = None
assert 0x0 <= major_type <= 0x7, f"Major type must be one of 0x0-0x7, found 0x{major_type:x} instead."
if isinstance(arg, float):
# Major type 0x7 (float case):
Expand Down Expand Up @@ -234,7 +234,7 @@ def _decode_list(stream: Stream, length: int, options: _DecodeOptions) -> Tuple[
def _decode_dict_key(stream: Stream, key_idx: int, dict_length: int, options: _DecodeOptions) -> Tuple[str, int, bytes]:
# pylint: disable = too-many-return-statements, too-many-branches
major_type, arg, num_bytes_read = _decode_head(stream)
ret: Optional[Tuple[Kind, int]] = None
ret: Optional[Tuple[IPLDKind, int]] = None
if major_type != 0x3:
raise DAGCBORDecodingError(_err._dict_key_type(stream, major_type))
assert not isinstance(arg, float)
Expand Down Expand Up @@ -309,7 +309,7 @@ def _decode_bool_none(stream: Stream, arg: int, options: _DecodeOptions) -> Tupl
def _decode_dummy(stream: Stream, arg: int, options: _DecodeOptions) -> Tuple[None, int]:
assert False, f"Major type {arg} does not have an associated decoder."

_decoders: Tuple[Callable[[Stream, int, _DecodeOptions], Tuple[Kind, int]], ...] = (
_decoders: Tuple[Callable[[Stream, int, _DecodeOptions], Tuple[IPLDKind, int]], ...] = (
_decode_dummy,
_decode_dummy,
_decode_bytes,
Expand Down
4 changes: 2 additions & 2 deletions dag_cbor/decoding/_err.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from multiformats import varint

from ..ipld import Kind
from ..ipld import IPLDKind
from ..encoding import _dag_cbor_code
from .err import CBORDecodingError
from ._stream import Stream, StreamSnapshot
Expand Down Expand Up @@ -139,7 +139,7 @@ def _invalid_tag(stream: Stream, arg: int) -> str:
def _cid(cid_head_snapshots: Tuple[StreamSnapshot, StreamSnapshot], e: CBORDecodingError) -> str:
return _cid_error_template(cid_head_snapshots, *_extract_error_cause_lines(e))

def _cid_bytes(cid_head_snapshots: Tuple[StreamSnapshot, StreamSnapshot], stream: Stream, cid_bytes: Kind) -> str:
def _cid_bytes(cid_head_snapshots: Tuple[StreamSnapshot, StreamSnapshot], stream: Stream, cid_bytes: IPLDKind) -> str:
decoded_type = type(cid_bytes).__name__
details = f"decodes to an item of type {repr(decoded_type)}"
explanation = [
Expand Down
36 changes: 18 additions & 18 deletions dag_cbor/encoding/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from multiformats import varint, multicodec, CID

from ..ipld import Kind, Path
from ..ipld import IPLDKind, ObjPath
from .err import CBOREncodingError, DAGCBOREncodingError

__all__ = ("CBOREncodingError", "DAGCBOREncodingError")
Expand Down Expand Up @@ -44,20 +44,20 @@ def canonical_order_dict(value: Dict[str, Any]) -> Dict[str, Any]:


@overload
def encode(data: Kind, stream: None = None, *,
def encode(data: IPLDKind, stream: None = None, *,
include_multicodec: bool = False,
normalize_strings: Optional[Literal["NFC", "NFKC", "NFD", "NFKD"]] = None
) -> bytes:
... # pragma: no cover

@overload
def encode(data: Kind, stream: BufferedIOBase, *,
def encode(data: IPLDKind, stream: BufferedIOBase, *,
include_multicodec: bool = False,
normalize_strings: Optional[Literal["NFC", "NFKC", "NFD", "NFKD"]] = None
) -> int:
... # pragma: no cover

def encode(data: Kind, stream: Optional[BufferedIOBase] = None, *,
def encode(data: IPLDKind, stream: Optional[BufferedIOBase] = None, *,
include_multicodec: bool = False,
normalize_strings: Optional[Literal["NFC", "NFKC", "NFD", "NFKD"]] = None
) -> Union[bytes, int]:
Expand All @@ -68,7 +68,7 @@ def encode(data: Kind, stream: Optional[BufferedIOBase] = None, *,
.. code-block:: python
def encode(data: Kind, stream: None = None) -> bytes:
def encode(data: IPLDKind, stream: None = None) -> bytes:
...
Example usage:
Expand All @@ -80,7 +80,7 @@ def encode(data: Kind, stream: None = None) -> bytes:
.. code-block:: python
def encode(data: Kind, stream: BufferedIOBase) -> int:
def encode(data: IPLDKind, stream: BufferedIOBase) -> int:
...
Example usage with a stream:
Expand Down Expand Up @@ -111,7 +111,7 @@ def encode(data: Kind, stream: BufferedIOBase) -> int:
if normalize_strings is not None:
validate(normalize_strings, Literal["NFC", "NFKC", "NFD", "NFKD"])
options["normalize_strings"] = normalize_strings
path = Path()
path = ObjPath()
if stream is None:
internal_stream = BytesIO()
if include_multicodec:
Expand All @@ -131,7 +131,7 @@ class _EncodeOptions(TypedDict, total=False):
normalize_strings: Literal["NFC", "NFKC", "NFD", "NFKD"]
r""" Optional Unicode normalization to be performed on UTF-8 strings prior to byte encoding. """

def _encode(stream: BufferedIOBase, value: Kind, path: Path, options: _EncodeOptions) -> int:
def _encode(stream: BufferedIOBase, value: IPLDKind, path: ObjPath, options: _EncodeOptions) -> int:
# pylint: disable = too-many-return-statements, too-many-branches
if isinstance(value, bool): # must go before int check
# major type 0x7 (additional info 20 and 21)
Expand Down Expand Up @@ -182,7 +182,7 @@ def _encode_head(stream: BufferedIOBase, major_type: int, arg: int) -> int:
stream.write(head)
return len(head)

def _encode_int(stream: BufferedIOBase, value: int, path: Path, options: _EncodeOptions) -> int:
def _encode_int(stream: BufferedIOBase, value: int, path: ObjPath, options: _EncodeOptions) -> int:
if value >= 18446744073709551616:
# unsigned int must be < 2**64
err = f"Error encoding integer value at {path}: Unsigned integer out of range."
Expand All @@ -197,26 +197,26 @@ def _encode_int(stream: BufferedIOBase, value: int, path: Path, options: _Encode
# negative int
return _encode_head(stream, 0x1, -1-value)

def _encode_bytes(stream: BufferedIOBase, value: bytes, path: Path, options: _EncodeOptions) -> int:
def _encode_bytes(stream: BufferedIOBase, value: bytes, path: ObjPath, options: _EncodeOptions) -> int:
num_head_bytes = _encode_head(stream, 0x2, len(value))
stream.write(value)
return num_head_bytes+len(value)

def _encode_str(stream: BufferedIOBase, value: str, path: Path, options: _EncodeOptions) -> int:
def _encode_str(stream: BufferedIOBase, value: str, path: ObjPath, options: _EncodeOptions) -> int:
if "normalize_strings" in options:
value = unicodedata.normalize(options["normalize_strings"], value)
utf8_value: bytes = value.encode("utf-8", errors="strict")
num_head_bytes = _encode_head(stream, 0x3, len(utf8_value))
stream.write(utf8_value)
return num_head_bytes+len(utf8_value)

def _encode_list(stream: BufferedIOBase, value: List[Any], path: Path, options: _EncodeOptions) -> int:
def _encode_list(stream: BufferedIOBase, value: List[Any], path: ObjPath, options: _EncodeOptions) -> int:
num_bytes_written = _encode_head(stream, 0x4, len(value))
for idx, item in enumerate(value):
num_bytes_written += _encode(stream, item, path/idx, options)
return num_bytes_written

def _encode_dict(stream: BufferedIOBase, value: Dict[str, Any], path: Path, options: _EncodeOptions) -> int:
def _encode_dict(stream: BufferedIOBase, value: Dict[str, Any], path: ObjPath, options: _EncodeOptions) -> int:
_check_key_compliance(value, path)
if "normalize_strings" in options:
nf = options["normalize_strings"]
Expand All @@ -234,18 +234,18 @@ def _encode_dict(stream: BufferedIOBase, value: Dict[str, Any], path: Path, opti
num_bytes_written += _encode(stream, v, path/k, options)
return num_bytes_written

def _encode_cid(stream: BufferedIOBase, value: CID, path: Path, options: _EncodeOptions) -> int:
def _encode_cid(stream: BufferedIOBase, value: CID, path: ObjPath, options: _EncodeOptions) -> int:
num_bytes_written = _encode_head(stream, 0x6, 42)
num_bytes_written += _encode_bytes(stream, b"\0" + bytes(value), path, options)
return num_bytes_written

def _encode_bool(stream: BufferedIOBase, value: bool, path: Path, options: _EncodeOptions) -> int:
def _encode_bool(stream: BufferedIOBase, value: bool, path: ObjPath, options: _EncodeOptions) -> int:
return _encode_head(stream, 0x7, 21 if value else 20)

def _encode_none(stream: BufferedIOBase, value: None, path: Path, options: _EncodeOptions) -> int:
def _encode_none(stream: BufferedIOBase, value: None, path: ObjPath, options: _EncodeOptions) -> int:
return _encode_head(stream, 0x7, 22)

def _encode_float(stream: BufferedIOBase, value: float, path: Path, options: _EncodeOptions) -> int:
def _encode_float(stream: BufferedIOBase, value: float, path: ObjPath, options: _EncodeOptions) -> int:
if math.isnan(value):
err = f"Error encoding float value at {path}: NaN is not allowed."
raise DAGCBOREncodingError(err)
Expand All @@ -258,7 +258,7 @@ def _encode_float(stream: BufferedIOBase, value: float, path: Path, options: _En
stream.write(head)
return len(head)

def _check_key_compliance(value: Dict[str, Any], path: Optional[Path] = None) -> None:
def _check_key_compliance(value: Dict[str, Any], path: Optional[ObjPath] = None) -> None:
""" Check keys for DAG-CBOR compliance. """
for idx, k in enumerate(value.keys()):
if not isinstance(k, str):
Expand Down
Loading

0 comments on commit 9f86297

Please sign in to comment.