diff --git a/nethsm/__init__.py b/nethsm/__init__.py index f9b9d82..3c1be19 100644 --- a/nethsm/__init__.py +++ b/nethsm/__init__.py @@ -387,7 +387,7 @@ def __init__( def close(self) -> None: self.client.close() # type: ignore[no-untyped-call] - def request( + def _request( self, method: str, endpoint: str, @@ -424,7 +424,7 @@ def request( encoder = json.JSONEncoder() body = encoder.encode(json_obj) - response = self.get_api().api_client.rest_client.request( + response = self._get_api().api_client.rest_client.request( method=method, url=url, headers=headers, body=body ) @@ -437,16 +437,16 @@ def request( ) return response - def get_api(self) -> "DefaultApi": + def _get_api(self) -> "DefaultApi": from .client.apis.tags.default_api import DefaultApi return DefaultApi(self.client) - def get_location(self, headers: HTTPHeaderDict) -> Optional[str]: + def _get_location(self, headers: HTTPHeaderDict) -> Optional[str]: return headers.get("location") - def get_key_id_from_location(self, headers: HTTPHeaderDict) -> str: - location = self.get_location(headers) + def _get_key_id_from_location(self, headers: HTTPHeaderDict) -> str: + location = self._get_location(headers) if not location: raise NetHSMError("Could not determine the ID of the new key") key_id_match = re.fullmatch(f"/api/{self.version}/keys/(.*)", location) @@ -454,8 +454,8 @@ def get_key_id_from_location(self, headers: HTTPHeaderDict) -> str: raise NetHSMError("Could not determine the ID of the new key") return key_id_match[1] - def get_user_id_from_location(self, headers: HTTPHeaderDict) -> str: - location = self.get_location(headers) + def _get_user_id_from_location(self, headers: HTTPHeaderDict) -> str: + location = self._get_location(headers) if not location: raise NetHSMError("Could not determine the ID of the new key") user_id_match = re.fullmatch(f"/api/{self.version}/users/(.*)", location) @@ -470,7 +470,7 @@ def unlock(self, passphrase: str) -> None: passphrase=passphrase, ) try: - self.get_api().unlock_post(request_body) + self._get_api().unlock_post(request_body) except Exception as e: _handle_exception( e, @@ -484,7 +484,7 @@ def unlock(self, passphrase: str) -> None: def lock(self) -> None: try: - self.get_api().lock_post() + self._get_api().lock_post() except Exception as e: _handle_exception( e, @@ -508,7 +508,7 @@ def provision( systemTime=system_time, ) try: - self.get_api().provision_post(request_body) + self._get_api().provision_post(request_body) except Exception as e: _handle_exception( e, @@ -520,7 +520,7 @@ def provision( def list_users(self) -> list[str]: try: - response = self.get_api().users_get() + response = self._get_api().users_get() except Exception as e: _handle_exception( e, @@ -534,7 +534,7 @@ def get_user(self, user_id: str) -> User: path_params = PathParametersDict(UserID=user_id) try: - response = self.get_api().users_user_id_get(path_params=path_params) + response = self._get_api().users_user_id_get(path_params=path_params) except Exception as e: _handle_exception( e, @@ -568,10 +568,12 @@ def add_user( try: if user_id: path_params = PathParametersDict(UserID=user_id) - self.get_api().users_user_id_put(path_params=path_params, body=body) + self._get_api().users_user_id_put(path_params=path_params, body=body) else: - response = self.get_api().users_post(body=body) - user_id = self.get_user_id_from_location(response.response.getheaders()) + response = self._get_api().users_post(body=body) + user_id = self._get_user_id_from_location( + response.response.getheaders() + ) except Exception as e: _handle_exception( e, @@ -593,7 +595,7 @@ def delete_user(self, user_id: str) -> None: try: path_params = PathParametersDict(UserID=user_id) - self.get_api().users_user_id_delete(path_params=path_params) + self._get_api().users_user_id_delete(path_params=path_params) except Exception as e: _handle_exception( e, @@ -615,7 +617,7 @@ def set_passphrase(self, user_id: str, passphrase: str) -> None: path_params = PathParametersDict(UserID=user_id) body = UserPassphrasePostDataDict(passphrase=passphrase) try: - self.get_api().users_user_id_passphrase_post( + self._get_api().users_user_id_passphrase_post( path_params=path_params, body=body ) except Exception as e: @@ -636,7 +638,7 @@ def list_operator_tags(self, user_id: str) -> list[str]: path_params = PathParametersDict(UserID=user_id) try: - response = self.get_api().users_user_id_tags_get(path_params=path_params) + response = self._get_api().users_user_id_tags_get(path_params=path_params) except Exception as e: _handle_exception( e, @@ -655,7 +657,7 @@ def add_operator_tag(self, user_id: str, tag: str) -> None: path_params = PathParametersDict(UserID=user_id, Tag=tag) try: - self.get_api().users_user_id_tags_tag_put(path_params=path_params) + self._get_api().users_user_id_tags_tag_put(path_params=path_params) except Exception as e: _handle_exception( e, @@ -675,7 +677,7 @@ def delete_operator_tag(self, user_id: str, tag: str) -> None: path_params = PathParametersDict(UserID=user_id, Tag=tag) try: - self.get_api().users_user_id_tags_tag_delete(path_params=path_params) + self._get_api().users_user_id_tags_tag_delete(path_params=path_params) except Exception as e: _handle_exception( e, @@ -693,7 +695,7 @@ def add_key_tag(self, key_id: str, tag: str) -> None: path_params = PathParametersDict(KeyID=key_id, Tag=tag) try: - self.get_api().keys_key_id_restrictions_tags_tag_put( + self._get_api().keys_key_id_restrictions_tags_tag_put( path_params=path_params ) except Exception as e: @@ -715,7 +717,7 @@ def delete_key_tag(self, key_id: str, tag: str) -> None: path_params = PathParametersDict(KeyID=key_id, Tag=tag) try: - self.get_api().keys_key_id_restrictions_tags_tag_delete( + self._get_api().keys_key_id_restrictions_tags_tag_delete( path_params=path_params ) except Exception as e: @@ -730,14 +732,14 @@ def delete_key_tag(self, key_id: str, tag: str) -> None: def get_info(self) -> Info: try: - response = self.get_api().info_get() + response = self._get_api().info_get() except Exception as e: _handle_exception(e) return Info(vendor=response.body.vendor, product=response.body.product) def get_state(self) -> State: try: - response = self.get_api().health_state_get() + response = self._get_api().health_state_get() except Exception as e: _handle_exception(e) return State.from_string(response.body.state) @@ -747,7 +749,7 @@ def get_random_data(self, n: int) -> str: body = RandomRequestDataDict(length=n) try: - response = self.get_api().random_post(body=body) + response = self._get_api().random_post(body=body) except Exception as e: _handle_exception( e, @@ -761,7 +763,7 @@ def get_random_data(self, n: int) -> str: def get_metrics(self) -> Mapping[str, Any]: try: - response = self.get_api().metrics_get() + response = self._get_api().metrics_get() except Exception as e: _handle_exception(e, state=State.OPERATIONAL, roles=[Role.METRICS]) return response.body @@ -772,9 +774,9 @@ def list_keys(self, filter: Optional[str] = None) -> list[str]: try: if filter: query_params = QueryParametersDict(filter=filter) - response = self.get_api().keys_get(query_params=query_params) + response = self._get_api().keys_get(query_params=query_params) else: - response = self.get_api().keys_get() + response = self._get_api().keys_get() except Exception as e: _handle_exception( e, @@ -788,7 +790,7 @@ def get_key(self, key_id: str) -> Key: path_params = PathParametersDict(KeyID=key_id) try: - response = self.get_api().keys_key_id_get(path_params=path_params) + response = self._get_api().keys_key_id_get(path_params=path_params) key = response.body except Exception as e: _handle_exception( @@ -822,7 +824,7 @@ def get_key_public_key(self, key_id: str) -> str: path_params = PathParametersDict(KeyID=key_id) try: - response = self.get_api().keys_key_id_public_pem_get( + response = self._get_api().keys_key_id_public_pem_get( path_params=path_params, skip_deserialization=True ) except Exception as e: @@ -896,18 +898,18 @@ def add_key( ) path_params = PathParametersDict(KeyID=key_id) - self.get_api().keys_key_id_put( + self._get_api().keys_key_id_put( path_params=path_params, body=body, content_type="application/json", ) else: - response = self.get_api().keys_post( + response = self._get_api().keys_post( body=body, content_type="application/json", skip_deserialization=True, ) - key_id = self.get_key_id_from_location(response.response.getheaders()) + key_id = self._get_key_id_from_location(response.response.getheaders()) except Exception as e: _handle_exception( e, @@ -925,7 +927,7 @@ def delete_key(self, key_id: str) -> None: path_params = PathParametersDict(KeyID=key_id) try: - self.get_api().keys_key_id_delete(path_params=path_params) + self._get_api().keys_key_id_delete(path_params=path_params) except Exception as e: _handle_exception( e, @@ -966,7 +968,7 @@ def generate_key( length=length, ) try: - response = self.get_api().keys_generate_post( + response = self._get_api().keys_generate_post( body=body, skip_deserialization=True ) except Exception as e: @@ -980,12 +982,12 @@ def generate_key( }, ) return key_id or str( - self.get_key_id_from_location(response.response.getheaders()) + self._get_key_id_from_location(response.response.getheaders()) ) def get_config_logging(self) -> LoggingConfig: try: - response = self.get_api().config_logging_get() + response = self._get_api().config_logging_get() except Exception as e: _handle_exception(e, state=State.OPERATIONAL, roles=[Role.ADMINISTRATOR]) return LoggingConfig( @@ -996,7 +998,7 @@ def get_config_logging(self) -> LoggingConfig: def get_config_network(self) -> NetworkConfig: try: - response = self.get_api().config_network_get() + response = self._get_api().config_network_get() except Exception as e: _handle_exception(e, state=State.OPERATIONAL, roles=[Role.ADMINISTRATOR]) return NetworkConfig( @@ -1007,21 +1009,21 @@ def get_config_network(self) -> NetworkConfig: def get_config_time(self) -> str: try: - response = self.get_api().config_time_get() + response = self._get_api().config_time_get() except Exception as e: _handle_exception(e, state=State.OPERATIONAL, roles=[Role.ADMINISTRATOR]) return response.body.time def get_config_unattended_boot(self) -> str: try: - response = self.get_api().config_unattended_boot_get() + response = self._get_api().config_unattended_boot_get() except Exception as e: _handle_exception(e, state=State.OPERATIONAL, roles=[Role.ADMINISTRATOR]) return response.body.status def get_public_key(self) -> str: try: - response = self.get_api().config_tls_public_pem_get( + response = self._get_api().config_tls_public_pem_get( skip_deserialization=True ) except Exception as e: @@ -1030,7 +1032,9 @@ def get_public_key(self) -> str: def get_certificate(self) -> str: try: - response = self.get_api().config_tls_cert_pem_get(skip_deserialization=True) + response = self._get_api().config_tls_cert_pem_get( + skip_deserialization=True + ) except Exception as e: _handle_exception(e, state=State.OPERATIONAL, roles=[Role.ADMINISTRATOR]) return response.response.data.decode("utf-8") @@ -1043,7 +1047,7 @@ def get_key_certificate(self, key_id: str) -> bytes: path_params = PathParametersDict(KeyID=key_id) - response = self.get_api().keys_key_id_cert_get( + response = self._get_api().keys_key_id_cert_get( path_params=path_params, skip_deserialization=True ) except Exception as e: @@ -1061,7 +1065,7 @@ def get_key_certificate(self, key_id: str) -> bytes: def set_certificate(self, cert: Bytes) -> None: try: - self.request( + self._request( "PUT", "config/tls/cert.pem", data=cert, @@ -1085,7 +1089,7 @@ def set_key_certificate(self, key_id: str, cert: Bytes) -> None: path_params = PathParametersDict(KeyID=key_id) - self.get_api().keys_key_id_cert_put(body=cert, path_params=path_params) + self._get_api().keys_key_id_cert_put(body=cert, path_params=path_params) except Exception as e: _handle_exception( e, @@ -1106,7 +1110,7 @@ def delete_key_certificate(self, key_id: str) -> None: path_params = PathParametersDict(KeyID=key_id) try: - self.get_api().keys_key_id_cert_delete(path_params=path_params) + self._get_api().keys_key_id_cert_delete(path_params=path_params) except Exception as e: _handle_exception( e, @@ -1135,7 +1139,7 @@ def csr( "emailAddress": email_address, } try: - response = self.get_api().config_tls_csr_pem_post( + response = self._get_api().config_tls_csr_pem_post( body=body, skip_deserialization=True ) except Exception as e: @@ -1158,7 +1162,7 @@ def generate_tls_key( ) try: - self.get_api().config_tls_generate_post(body=body) + self._get_api().config_tls_generate_post(body=body) except Exception as e: _handle_exception(e, state=State.OPERATIONAL, roles=[Role.ADMINISTRATOR]) @@ -1188,7 +1192,7 @@ def key_csr( "emailAddress": email_address, } try: - response = self.get_api().keys_key_id_csr_pem_post( + response = self._get_api().keys_key_id_csr_pem_post( path_params=path_params, body=body, skip_deserialization=True ) except Exception as e: @@ -1213,7 +1217,7 @@ def set_backup_passphrase( newPassphrase=new_passphrase, currentPassphrase=current_passphrase or "" ) try: - self.get_api().config_backup_passphrase_put(body=body) + self._get_api().config_backup_passphrase_put(body=body) except Exception as e: _handle_exception( e, @@ -1235,7 +1239,7 @@ def set_unlock_passphrase( newPassphrase=new_passphrase, currentPassphrase=current_passphrase ) try: - self.get_api().config_unlock_passphrase_put(body=body) + self._get_api().config_unlock_passphrase_put(body=body) except Exception as e: _handle_exception( e, @@ -1258,7 +1262,7 @@ def set_logging_config( ipAddress=ip_address, port=port, logLevel=log_level.value ) try: - self.get_api().config_logging_put(body=body) + self._get_api().config_logging_put(body=body) except Exception as e: _handle_exception( e, @@ -1274,7 +1278,7 @@ def set_network_config(self, ip_address: str, netmask: str, gateway: str) -> Non body = NetworkConfigDict(ipAddress=ip_address, netmask=netmask, gateway=gateway) try: - self.get_api().config_network_put(body=body) + self._get_api().config_network_put(body=body) except Exception as e: _handle_exception( e, @@ -1290,7 +1294,7 @@ def set_time(self, time: Union[str, datetime]) -> None: body = TimeConfigDict(time=time) try: - self.get_api().config_time_put(body=body) + self._get_api().config_time_put(body=body) except Exception as e: _handle_exception( e, @@ -1308,7 +1312,7 @@ def set_unattended_boot(self, status: UnattendedBootStatus) -> None: body = UnattendedBootConfigDict(status=status.value) try: - self.get_api().config_unattended_boot_put(body=body) + self._get_api().config_unattended_boot_put(body=body) except Exception as e: _handle_exception( e, @@ -1321,7 +1325,7 @@ def set_unattended_boot(self, status: UnattendedBootStatus) -> None: def get_system_info(self) -> SystemInfo: try: - response = self.get_api().system_info_get() + response = self._get_api().system_info_get() except Exception as e: _handle_exception( e, @@ -1337,7 +1341,7 @@ def get_system_info(self) -> SystemInfo: def backup(self) -> bytes: try: - response = self.get_api().system_backup_post() + response = self._get_api().system_backup_post() except Exception as e: _handle_exception( e, @@ -1363,7 +1367,7 @@ def restore(self, backup: Bytes, passphrase: str, time: datetime) -> None: backup_file=backup, ) - self.get_api().system_restore_post(body) + self._get_api().system_restore_post(body) except Exception as e: _handle_exception( e, @@ -1375,7 +1379,7 @@ def restore(self, backup: Bytes, passphrase: str, time: datetime) -> None: def update(self, image: Bytes) -> str: try: - response = self.get_api().system_update_post(body=image) + response = self._get_api().system_update_post(body=image) except Exception as e: _handle_exception( e, @@ -1398,7 +1402,7 @@ def update(self, image: Bytes) -> str: def cancel_update(self) -> None: try: - self.get_api().system_cancel_update_post() + self._get_api().system_cancel_update_post() except Exception as e: _handle_exception( e, @@ -1408,7 +1412,7 @@ def cancel_update(self) -> None: def commit_update(self) -> None: try: - self.get_api().system_commit_update_post() + self._get_api().system_commit_update_post() except Exception as e: _handle_exception( e, @@ -1418,7 +1422,7 @@ def commit_update(self) -> None: def reboot(self) -> None: try: - self.get_api().system_reboot_post() + self._get_api().system_reboot_post() except Exception as e: _handle_exception( e, @@ -1428,7 +1432,7 @@ def reboot(self) -> None: def shutdown(self) -> None: try: - self.get_api().system_shutdown_post() + self._get_api().system_shutdown_post() except Exception as e: _handle_exception( e, @@ -1438,7 +1442,7 @@ def shutdown(self) -> None: def factory_reset(self) -> None: try: - self.get_api().system_factory_reset_post() + self._get_api().system_factory_reset_post() except Exception as e: _handle_exception( e, @@ -1459,7 +1463,7 @@ def encrypt( path_params = PathParametersDict(KeyID=key_id) body = EncryptRequestDataDict(message=data, mode=mode.value, iv=iv) try: - response = self.get_api().keys_key_id_encrypt_post( + response = self._get_api().keys_key_id_encrypt_post( path_params=path_params, body=body ) except Exception as e: @@ -1495,7 +1499,7 @@ def decrypt( path_params = PathParametersDict(KeyID=key_id) try: - response = self.get_api().keys_key_id_decrypt_post( + response = self._get_api().keys_key_id_decrypt_post( path_params=path_params, body=body ) except Exception as e: @@ -1524,7 +1528,7 @@ def sign( path_params = PathParametersDict(KeyID=key_id) body = SignRequestDataDict(message=data, mode=mode.value) try: - response = self.get_api().keys_key_id_sign_post( + response = self._get_api().keys_key_id_sign_post( path_params=path_params, body=body ) except Exception as e: