Skip to content

Commit

Permalink
Support different types of bytes input (#82)
Browse files Browse the repository at this point in the history
Previously, we required bytes input as a BufferedReader.  With this
patch, we also accept FileIO and bytes instances, as supported by the
generated API client.

Fixes: #63
  • Loading branch information
robin-nitrokey authored Nov 17, 2023
1 parent bcaaedb commit ff2e185
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 8 deletions.
20 changes: 13 additions & 7 deletions nethsm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from base64 import b64encode
from dataclasses import dataclass
from datetime import datetime
from io import BufferedReader
from io import BufferedReader, FileIO
from typing import TYPE_CHECKING, Any, Iterator, Literal, Mapping, Optional, Union, cast
from urllib.parse import urlencode

Expand All @@ -31,6 +31,9 @@
from .client.apis.tags.default_api import DefaultApi


Bytes = Union[BufferedReader, FileIO, bytes]


class Role(enum.Enum):
ADMINISTRATOR = "Administrator"
OPERATOR = "Operator"
Expand Down Expand Up @@ -363,7 +366,7 @@ def request(
method: str,
endpoint: str,
params: Optional[dict[str, str]] = None,
data: Optional[BufferedReader] = None,
data: Optional[Bytes] = None,
mime_type: Optional[str] = "application/json",
json_obj: Optional[Any] = None,
) -> HTTPResponse:
Expand All @@ -387,7 +390,10 @@ def request(

body: Union[str, bytes, None] = None
if data:
body = data.read(-1)
if isinstance(data, bytes):
body = data
else:
body = data.read()
elif json_obj:
encoder = json.JSONEncoder()
body = encoder.encode(json_obj)
Expand Down Expand Up @@ -1016,7 +1022,7 @@ def get_key_certificate(self, key_id: str) -> bytes:
)
return response.response.data

def set_certificate(self, cert: BufferedReader) -> None:
def set_certificate(self, cert: Bytes) -> None:
try:
self.request(
"PUT",
Expand All @@ -1034,7 +1040,7 @@ def set_certificate(self, cert: BufferedReader) -> None:
},
)

def set_key_certificate(self, key_id: str, cert: BufferedReader) -> None:
def set_key_certificate(self, key_id: str, cert: Bytes) -> None:
try:
from .client.paths.keys_key_id_cert.put.path_parameters import (
PathParametersDict,
Expand Down Expand Up @@ -1304,7 +1310,7 @@ def backup(self) -> bytes:
)
return response.response.data

def restore(self, backup: BufferedReader, passphrase: str, time: datetime) -> None:
def restore(self, backup: Bytes, passphrase: str, time: datetime) -> None:
try:
from .client.paths.system_restore.post.request_body.content.multipart_form_data.schema import (
ArgumentsDict,
Expand All @@ -1328,7 +1334,7 @@ def restore(self, backup: BufferedReader, passphrase: str, time: datetime) -> No
},
)

def update(self, image: BufferedReader) -> str:
def update(self, image: Bytes) -> str:
try:
response = self.get_api().system_update_post(body=image)
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_nethsm_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def test_set_certificate(nethsm: NetHSM) -> None:
C.EMAIL_ADDRESS,
)
cert = self_sign_csr(csr)
nethsm.set_certificate(BytesIO(cert)) # type: ignore
nethsm.set_certificate(cert)

remote_cert = nethsm.get_certificate()
assert cert.decode("utf-8") == remote_cert
Expand Down

0 comments on commit ff2e185

Please sign in to comment.