From bcaaedbcc2c95e100eb69bd0b34b403d7c34db19 Mon Sep 17 00:00:00 2001 From: Robin Krahl Date: Fri, 17 Nov 2023 12:21:45 +0100 Subject: [PATCH] Add type checks for tests (#76) Fixes: https://github.com/Nitrokey/nethsm-sdk-py/issues/70 --- Makefile | 2 +- tests/conftest.py | 60 +++++++++---------- tests/test_nethsm_config.py | 42 +++++++------- tests/test_nethsm_keys.py | 59 ++++++++++--------- tests/test_nethsm_other.py | 34 ++++++----- tests/test_nethsm_system.py | 28 ++++----- tests/test_nethsm_users.py | 112 ++++++++++++++++++------------------ tests/utilities.py | 75 ++++++++++++------------ 8 files changed, 213 insertions(+), 199 deletions(-) diff --git a/Makefile b/Makefile index 5101408..44a51ba 100644 --- a/Makefile +++ b/Makefile @@ -20,7 +20,7 @@ check-style: check-typing: @echo "Note: run semi-clean target in case this fails without any proper reason" - $(PYTHON3_VENV) -m mypy $(PACKAGE_NAME)/ + $(PYTHON3_VENV) -m mypy $(PACKAGE_NAME)/ tests/ check: check-format check-import-sorting check-style check-typing test diff --git a/tests/conftest.py b/tests/conftest.py index 97dfb6c..172aa6d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,13 @@ +from dataclasses import dataclass from os import environ +from typing import Literal + + +@dataclass +class UserData: + user_id: str + real_name: str + role: Literal["Administrator", "Operator", "Metrics", "Backup"] class Constants: @@ -60,12 +69,12 @@ class Constants: PORT = 514 NETMASK = "255.255.255.0" GATEWAY = "0.0.0.0" - UNATTENDED_BOOT_OFF = "off" - UNATTENDED_BOOT_ON = "on" - LOG_LEVEL = "info" + UNATTENDED_BOOT_OFF: Literal["off"] = "off" + UNATTENDED_BOOT_ON: Literal["on"] = "on" + LOG_LEVEL: Literal["info"] = "info" # test_nethsm_keys - TYPE = "RSA" + TYPE: Literal["RSA"] = "RSA" MECHANISM = [ "RSA_Signature_PKCS1", "RSA_Decryption_PKCS1", @@ -77,7 +86,7 @@ class Constants: KEY_ID_GENERATED = "KeyIdGenerated" KEY_ID_AES = "KeyIdAES" DATA = "Test data 123456" - MODE = "PKCS1" + MODE: Literal["PKCS1"] = "PKCS1" # 'PKCS1', 'PSS_MD5', 'PSS_SHA1', 'PSS_SHA224', 'PSS_SHA256', 'PSS_SHA384', 'PSS_SHA512', 'EdDSA', 'ECDSA' # test_nethsm_users, test_nethsm_keys TAG1 = "Frankfurt" @@ -85,32 +94,23 @@ class Constants: TAG3 = "Teltow" TAGS = [TAG1, TAG2, TAG3] - class AdminUser: - USER_ID = "admin" - REAL_NAME = "admin" - ROLE = "Administrator" - - class AdministratorUser: - USER_ID = "UIAdministrator" - REAL_NAME = "RNAdministrator" - ROLE = "Administrator" - - class OperatorUser: - USER_ID = "UIOperator" - REAL_NAME = "RNOperator" - ROLE = "Operator" - - class MetricsUser: - USER_ID = "UIMetrics" - REAL_NAME = "RNMetrics" - ROLE = "Metrics" - - class BackupUser: - USER_ID = "UIBackup" - REAL_NAME = "RNBackup" - ROLE = "Backup" + ADMIN_USER = UserData(user_id="admin", real_name="admin", role="Administrator") + ADMINISTRATOR_USER = UserData( + user_id="UIAdministrator", real_name="RNAdministrator", role="Administrator" + ) + OPERATOR_USER = UserData( + user_id="UIOperator", real_name="RNOperator", role="Operator" + ) + METRICS_USER = UserData(user_id="UIMetrics", real_name="RNMetrics", role="Metrics") + BACKUP_USER = UserData(user_id="UIBackup", real_name="RNBackup", role="Backup") DETAILS = "" - USERS_LIST = [AdministratorUser, BackupUser, MetricsUser, OperatorUser, AdminUser] + USERS_LIST = [ + ADMINISTRATOR_USER, + BACKUP_USER, + METRICS_USER, + OPERATOR_USER, + ADMIN_USER, + ] # nitropy nethsm --host nethsmdemo.nitrokey.com --no-verify-tls info diff --git a/tests/test_nethsm_config.py b/tests/test_nethsm_config.py index 5693b1f..ac2783b 100644 --- a/tests/test_nethsm_config.py +++ b/tests/test_nethsm_config.py @@ -6,6 +6,7 @@ from utilities import lock, nethsm, self_sign_csr, unlock # noqa: F401 import nethsm as nethsm_module +from nethsm import NetHSM """########## Preparation for the Tests ########## @@ -16,21 +17,21 @@ """ -def get_config_logging(nethsm): +def get_config_logging(nethsm: NetHSM) -> None: data = nethsm.get_config_logging() assert data.ip_address == C.IP_ADDRESS_LOGGING assert data.port == C.PORT assert data.log_level.value == C.LOG_LEVEL -def get_config_network(nethsm): +def get_config_network(nethsm: NetHSM) -> None: data = nethsm.get_config_network() assert data.ip_address == C.IP_ADDRESS_NETWORK assert data.netmask == C.NETMASK assert data.gateway == C.GATEWAY -def get_config_time(nethsm): +def get_config_time(nethsm: NetHSM) -> None: dt_nethsm = datetime.datetime.strptime( nethsm.get_config_time(), "%Y-%m-%dT%H:%M:%SZ" ).replace(tzinfo=datetime.timezone.utc) @@ -48,7 +49,7 @@ def get_config_time(nethsm): """##########Start of Tests##########""" -def test_csr(nethsm): +def test_csr(nethsm: NetHSM) -> None: csr = nethsm.csr( C.COUNTRY, C.STATE_OR_PROVINCE, @@ -61,7 +62,7 @@ def test_csr(nethsm): print(csr) -def test_set_certificate(nethsm: nethsm_module.NetHSM) -> None: +def test_set_certificate(nethsm: NetHSM) -> None: csr = nethsm.csr( C.COUNTRY, @@ -73,18 +74,17 @@ def test_set_certificate(nethsm: nethsm_module.NetHSM) -> None: C.EMAIL_ADDRESS, ) cert = self_sign_csr(csr) - nethsm.set_certificate(BytesIO(cert)) + nethsm.set_certificate(BytesIO(cert)) # type: ignore remote_cert = nethsm.get_certificate() assert cert.decode("utf-8") == remote_cert -def generate_tls_key(nethsm): - resp = nethsm.generate_tls_key("RSA", 2048) - print(resp) +def generate_tls_key(nethsm: NetHSM) -> None: + nethsm.generate_tls_key("RSA", 2048) -def test_get_config_logging(nethsm): +def test_get_config_logging(nethsm: NetHSM) -> None: """Query the configuration of a NetHSM. For logging @@ -94,7 +94,7 @@ def test_get_config_logging(nethsm): get_config_logging(nethsm) -def test_get_config_network(nethsm): +def test_get_config_network(nethsm: NetHSM) -> None: """Query the configuration of a NetHSM. For network @@ -104,7 +104,7 @@ def test_get_config_network(nethsm): get_config_logging(nethsm) -def test_get_config_time(nethsm): +def test_get_config_time(nethsm: NetHSM) -> None: """Query the configuration of a NetHSM. For time @@ -115,7 +115,7 @@ def test_get_config_time(nethsm): get_config_time(nethsm) -def test_get_config_unattended_boot(nethsm): +def test_get_config_unattended_boot(nethsm: NetHSM) -> None: """Query the configuration of a NetHSM. For unattended boot @@ -129,7 +129,7 @@ def test_get_config_unattended_boot(nethsm): ) -def test_get_config_get_public_key(nethsm): +def test_get_config_get_public_key(nethsm: NetHSM) -> None: """Query the configuration of a NetHSM. For get public key @@ -144,7 +144,7 @@ def test_get_config_get_public_key(nethsm): assert str_end == "-----END PUBLIC KEY-----" -def test_get_config_get_certificate(nethsm): +def test_get_config_get_certificate(nethsm: NetHSM) -> None: """Query the configuration of a NetHSM. For get certificate @@ -159,7 +159,7 @@ def test_get_config_get_certificate(nethsm): assert str_end == "-----END CERTIFICATE-----" -def test_set_backup_passphrase(nethsm): +def test_set_backup_passphrase(nethsm: NetHSM) -> None: """Set the backup passphrase of a NetHSM. This command requires authentication as a user with the Administrator @@ -175,7 +175,7 @@ def test_set_backup_passphrase(nethsm): # @pytest.mark.skip(reason="not finished yet") -def test_set_get_logging_config(nethsm): +def test_set_get_logging_config(nethsm: NetHSM) -> None: """Set the logging configuration of a NetHSM. This command requires authentication as a user with the Administrator @@ -188,7 +188,7 @@ def test_set_get_logging_config(nethsm): # @pytest.mark.skip(reason="not finished yet") -def test_set_get_network_config(nethsm): +def test_set_get_network_config(nethsm: NetHSM) -> None: """Set the network configuration of a NetHSM. This command requires authentication as a user with the Administrator @@ -200,7 +200,7 @@ def test_set_get_network_config(nethsm): get_config_network(nethsm) -def test_set_get_time(nethsm): +def test_set_get_time(nethsm: NetHSM) -> None: """Set the system time of a NetHSM. If the time is not given as an argument, the system time of this system @@ -213,7 +213,7 @@ def test_set_get_time(nethsm): get_config_time(nethsm) -def test_set_get_unattended_boot(nethsm): +def test_set_get_unattended_boot(nethsm: NetHSM) -> None: """Set the unattended boot configuration of a NetHSM. This command requires authentication as a user with the Administrator @@ -234,7 +234,7 @@ def test_set_get_unattended_boot(nethsm): assert str(nethsm.get_config_unattended_boot()) == C.UNATTENDED_BOOT_ON -def test_set_unlock_passphrase_lock_unlock(nethsm): +def test_set_unlock_passphrase_lock_unlock(nethsm: NetHSM) -> None: """Set the unlock passphrase of a NetHSM. This command requires authentication as a user with the Administrator diff --git a/tests/test_nethsm_keys.py b/tests/test_nethsm_keys.py index d6d8ef7..9dee281 100644 --- a/tests/test_nethsm_keys.py +++ b/tests/test_nethsm_keys.py @@ -15,6 +15,7 @@ ) import nethsm as nethsm_module +from nethsm import NetHSM """########## Preparation for the Tests ########## @@ -25,7 +26,7 @@ """ -def add_key(nethsm): +def add_key(nethsm: NetHSM) -> None: """Add a key pair on the NetHSM. If the key ID is not set, it is generated by the NetHSM. @@ -41,7 +42,7 @@ def add_key(nethsm): nethsm.add_key( key_id=C.KEY_ID_ADDED, type=C.TYPE, - mechanisms=C.MECHANISM, + mechanisms=C.MECHANISM, # type: ignore prime_p=p, prime_q=q, public_exponent=e, @@ -50,7 +51,7 @@ def add_key(nethsm): ) -def generate_key_aes(nethsm): +def generate_key_aes(nethsm: NetHSM) -> None: """Add a key pair on the NetHSM. If the key ID is not set, it is generated by the NetHSM. @@ -64,23 +65,23 @@ def generate_key_aes(nethsm): nethsm.generate_key( key_id=C.KEY_ID_AES, type="Generic", - mechanisms=["AES_Encryption_CBC", "AES_Decryption_CBC"], + mechanisms=["AES_Encryption_CBC", "AES_Decryption_CBC"], # type: ignore length=256, ) -def generate_key(nethsm): +def generate_key(nethsm: NetHSM) -> None: """Get information about a key on the NetHSM. This command requires authentication as a user with the Administrator or Operator role.""" try: - nethsm.generate_key(C.TYPE, C.MECHANISM, C.LENGTH, C.KEY_ID_GENERATED) + nethsm.generate_key(C.TYPE, C.MECHANISM, C.LENGTH, C.KEY_ID_GENERATED) # type: ignore except nethsm_module.NetHSMError: pass -def add_key_tags(nethsm): +def add_key_tags(nethsm: NetHSM) -> None: """Add a tag for a key on the NetHSM. This command requires authentication as a user with the Administrator @@ -90,14 +91,14 @@ def add_key_tags(nethsm): nethsm.add_key_tag(key_id=C.KEY_ID_GENERATED, tag=C.TAG3) -def encrypt_data(): +def encrypt_data() -> None: """Todo: encrypt data with python for test_decrypt to work""" """##########Start of Tests##########""" -def test_generate_key(nethsm): +def test_generate_key(nethsm: NetHSM) -> None: """Generate a key pair on the NetHSM. This command requires authentication as a user with the Administrator @@ -105,7 +106,7 @@ def test_generate_key(nethsm): generate_key(nethsm) -def test_add_key(nethsm): +def test_add_key(nethsm: NetHSM) -> None: """Add a key pair on the NetHSM. If the key ID is not set, it is generated by the NetHSM. @@ -115,7 +116,7 @@ def test_add_key(nethsm): add_key(nethsm) -def test_add_get_key_by_public_key(nethsm): +def test_add_get_key_by_public_key(nethsm: NetHSM) -> None: """Get information about a key on the NetHSM. This command requires authentication as a user with the Administrator or @@ -127,7 +128,7 @@ def test_add_get_key_by_public_key(nethsm): nethsm.get_key_public_key(C.KEY_ID_ADDED) -def test_generate_get_key_by_id(nethsm: nethsm_module.NetHSM): +def test_generate_get_key_by_id(nethsm: nethsm_module.NetHSM) -> None: """Get information about a key on the NetHSM. This command requires authentication as a user with the Administrator or @@ -148,7 +149,7 @@ def test_generate_get_key_by_id(nethsm: nethsm_module.NetHSM): assert key.public_exponent -def test_add_key_tag_get_key(nethsm): +def test_add_key_tag_get_key(nethsm: NetHSM) -> None: """Add a tag for a key on the NetHSM. This command requires authentication as a user with the Administrator @@ -158,12 +159,13 @@ def test_add_key_tag_get_key(nethsm): key = nethsm.get_key(C.KEY_ID_GENERATED) tags = key.tags + assert tags assert C.TAG1 in tags assert C.TAG2 in tags assert C.TAG3 in tags -def test_delete_key_tag_get_key(nethsm): +def test_delete_key_tag_get_key(nethsm: NetHSM) -> None: """Delete a tag for a key on the NetHSM. This command requires authentication as a user with the Administrator @@ -175,12 +177,13 @@ def test_delete_key_tag_get_key(nethsm): nethsm.delete_key_tag(key_id=C.KEY_ID_GENERATED, tag=C.TAG2) key = nethsm.get_key(C.KEY_ID_GENERATED) + assert key.tags assert C.TAG1 not in key.tags assert C.TAG2 not in key.tags assert C.TAG3 in key.tags -def test_list_get_keys(nethsm: nethsm_module.NetHSM): +def test_list_get_keys(nethsm: nethsm_module.NetHSM) -> None: """List all keys on the NetHSM. This command requires authentication as a user with the Administrator or @@ -203,7 +206,7 @@ def test_list_get_keys(nethsm: nethsm_module.NetHSM): assert key.public_exponent -def test_delete_key(nethsm): +def test_delete_key(nethsm: NetHSM) -> None: """Delete the key pair with the given key ID on the NetHSM. This command requires authentication as a user with the Administrator @@ -215,7 +218,7 @@ def test_delete_key(nethsm): nethsm.delete_key(C.KEY_ID_ADDED) -def test_set_get_key_certificate(nethsm: nethsm_module.NetHSM): +def test_set_get_key_certificate(nethsm: NetHSM) -> None: add_key(nethsm) with open(C.CERTIFICATE_FILE, "rb") as f: @@ -226,7 +229,7 @@ def test_set_get_key_certificate(nethsm: nethsm_module.NetHSM): assert certificate == file_cert -def test_key_csr(nethsm): +def test_key_csr(nethsm: NetHSM) -> None: add_key(nethsm) @@ -243,7 +246,7 @@ def test_key_csr(nethsm): print(csr) -def test_delete_certificate(nethsm): +def test_delete_certificate(nethsm: NetHSM) -> None: """Delete a certificate for a stored key from the NetHSM. This command requires authentication as a user with the Administrator @@ -259,17 +262,17 @@ def test_delete_certificate(nethsm): nethsm.get_key_certificate(C.KEY_ID_ADDED) -def test_sign(nethsm): # mit dem privaten schlüssel signieren +def test_sign(nethsm: NetHSM) -> None: # mit dem privaten schlüssel signieren """Sign data with a secret key on the NetHSM and print the signature. This command requires authentication as a user with the Operator role.""" generate_key(nethsm) - add_user(nethsm, C.OperatorUser) + add_user(nethsm, C.OPERATOR_USER) key = nethsm.get_key_public_key(C.KEY_ID_GENERATED) hash_object = SHA256.new(data=C.DATA.encode()) - with connect(C.OperatorUser) as nethsm: + with connect(C.OPERATOR_USER) as nethsm: signature = nethsm.sign( C.KEY_ID_GENERATED, base64.b64encode(hash_object.digest()).decode(), @@ -279,17 +282,17 @@ def test_sign(nethsm): # mit dem privaten schlüssel signieren verify_rsa_signature(key, hash_object, base64.b64decode(signature)) -def test_decrypt(nethsm): +def test_decrypt(nethsm: NetHSM) -> None: """Decrypt data with a secret key on the NetHSM and print the decrypted message. This command requires authentication as a user with the Operator role. Todo: encrypt_data() with python for test_decrypt() to work""" generate_key(nethsm) - add_user(nethsm, C.OperatorUser) + add_user(nethsm, C.OPERATOR_USER) key = nethsm.get_key_public_key(C.KEY_ID_GENERATED) encrypted = encrypt_rsa(key, C.DATA) - with connect(C.OperatorUser) as nethsm: + with connect(C.OPERATOR_USER) as nethsm: decrypt = nethsm.decrypt( C.KEY_ID_GENERATED, base64.b64encode(encrypted).decode(), @@ -299,17 +302,17 @@ def test_decrypt(nethsm): assert base64.b64decode(decrypt).decode() == C.DATA -def test_encrypt_decrypt(nethsm): +def test_encrypt_decrypt(nethsm: NetHSM) -> None: generate_key_aes(nethsm) - add_user(nethsm, C.OperatorUser) + add_user(nethsm, C.OPERATOR_USER) IV = Random.new().read(AES.block_size) iv_b64 = base64.b64encode(IV).decode() data_b64 = base64.b64encode(C.DATA.encode()).decode() - with connect(C.OperatorUser) as nethsm: + with connect(C.OPERATOR_USER) as nethsm: encrypted = nethsm.encrypt( C.KEY_ID_AES, diff --git a/tests/test_nethsm_other.py b/tests/test_nethsm_other.py index f1100af..53cfe76 100644 --- a/tests/test_nethsm_other.py +++ b/tests/test_nethsm_other.py @@ -1,9 +1,13 @@ -import docker +from typing import Iterator + +import docker # type: ignore import pytest from conftest import Constants as C from utilities import nethsm # noqa: F401 from utilities import add_user, connect, lock, provision, start_nethsm, unlock +from nethsm import NetHSM + """######################### Preparation for the Tests ######################### To run these test on Ubuntu like systems in Terminal you need sudo rights. @@ -14,14 +18,14 @@ @pytest.fixture(scope="module") -def nethsm_no_provision(): +def nethsm_no_provision() -> Iterator[NetHSM]: """Start Docker container with Nethsm image and connect to Nethsm This Pytest Fixture will run before the tests to provide the tests with a nethsm instance via Docker container""" container = start_nethsm() - with connect(C.AdminUser) as nethsm: + with connect(C.ADMIN_USER) as nethsm: yield nethsm try: @@ -33,13 +37,13 @@ def nethsm_no_provision(): """######################### Start of Tests #########################""" -def test_state(nethsm_no_provision): +def test_state(nethsm_no_provision: NetHSM) -> None: """Query the state of a NetHSM.""" state = nethsm_no_provision.get_state().value assert state in C.STATES -def test_state_provision(nethsm_no_provision): +def test_state_provision(nethsm_no_provision: NetHSM) -> None: """Initial provisioning of a NetHSM. If unlock or admin passphrases are not set, they have to be entered @@ -49,7 +53,7 @@ def test_state_provision(nethsm_no_provision): assert nethsm_no_provision.get_state().value == "Operational" -def test_info(nethsm_no_provision): +def test_info(nethsm_no_provision: NetHSM) -> None: """Query the vendor and product information for a NetHSM.""" (vendor, product) = nethsm_no_provision.get_info() assert nethsm_no_provision.host == C.HOST @@ -57,16 +61,18 @@ def test_info(nethsm_no_provision): assert product == "NetHSM" -def test_state_provision_add_user_metrics_get_metrics(nethsm_no_provision): +def test_state_provision_add_user_metrics_get_metrics( + nethsm_no_provision: NetHSM, +) -> None: """Query the metrics of a NetHSM. This command requires authentication as a user with the Metrics role. Fixme: Asserts True on linux and False on Macos due to lack of a few metrics in metrics""" provision(nethsm_no_provision) - add_user(nethsm_no_provision, C.MetricsUser) + add_user(nethsm_no_provision, C.METRICS_USER) - with connect(C.MetricsUser) as nethsm: + with connect(C.METRICS_USER) as nethsm: data = nethsm.get_metrics() metrics = [ "gc compactions", @@ -87,7 +93,7 @@ def test_state_provision_add_user_metrics_get_metrics(nethsm_no_provision): assert metric in data -def test_state_provision_unlock_lock(nethsm_no_provision): +def test_state_provision_unlock_lock(nethsm_no_provision: NetHSM) -> None: """Bring an operational NetHSM into locked state. This command requires authentication as a user with the Administrator @@ -98,7 +104,7 @@ def test_state_provision_unlock_lock(nethsm_no_provision): lock(nethsm_no_provision) -def test_state_provision_lock_unlock(nethsm_no_provision): +def test_state_provision_lock_unlock(nethsm_no_provision: NetHSM) -> None: """Bring a locked NetHSM into operational state.""" provision(nethsm_no_provision) lock(nethsm_no_provision) @@ -106,14 +112,14 @@ def test_state_provision_lock_unlock(nethsm_no_provision): unlock(nethsm_no_provision, C.UNLOCK_PASSPHRASE) -def test_state_provision_add_user_get_random_data(nethsm_no_provision): +def test_state_provision_add_user_get_random_data(nethsm_no_provision: NetHSM) -> None: """Retrieve random bytes from the NetHSM as a Base64 string. This command requires authentication as a user with the Operator role.""" provision(nethsm_no_provision) - add_user(nethsm_no_provision, C.OperatorUser) + add_user(nethsm_no_provision, C.OPERATOR_USER) - with connect(C.OperatorUser) as nethsm: + with connect(C.OPERATOR_USER) as nethsm: random_data1 = nethsm.get_random_data(100) random_data2 = nethsm.get_random_data(100) random_data3 = nethsm.get_random_data(100) diff --git a/tests/test_nethsm_system.py b/tests/test_nethsm_system.py index 4aaffaa..0e1d4a6 100644 --- a/tests/test_nethsm_system.py +++ b/tests/test_nethsm_system.py @@ -2,7 +2,6 @@ import datetime import os -import docker import pytest from conftest import Constants as C from test_nethsm_keys import generate_key @@ -17,6 +16,7 @@ update, ) +from nethsm import NetHSM from nethsm.backup import Backup, EncryptedBackup """######################### Preparation for the Tests ######################### @@ -31,7 +31,7 @@ """######################### Start of Tests #########################""" -def test_provision_system_info(nethsm): +def test_provision_system_info(nethsm: NetHSM) -> None: """Get system information for a NetHSM instance. This command requires authentication as a user with the Administrator @@ -48,14 +48,14 @@ def test_provision_system_info(nethsm): # fixme: this changes between the NetHSM instances -def test_passphrase_add_user_retrieve_backup(nethsm): +def test_passphrase_add_user_retrieve_backup(nethsm: NetHSM) -> None: """Make a backup of a NetHSM instance and write it to a file. This command requires authentication as a user with the Backup role.""" set_backup_passphrase(nethsm) - add_user(nethsm, C.BackupUser) - add_user(nethsm, C.OperatorUser) + add_user(nethsm, C.BACKUP_USER) + add_user(nethsm, C.OPERATOR_USER) generate_key(nethsm) assert nethsm.list_keys() == [C.KEY_ID_GENERATED] @@ -65,7 +65,7 @@ def test_passphrase_add_user_retrieve_backup(nethsm): with open(C.FILENAME_ENCRYPTED, "wb") as f: f.write(encrypted) - with connect(C.BackupUser) as nethsm: + with connect(C.BACKUP_USER) as nethsm: if os.path.exists(C.FILENAME_BACKUP): os.remove(C.FILENAME_BACKUP) data = nethsm.backup() @@ -79,7 +79,7 @@ def test_passphrase_add_user_retrieve_backup(nethsm): assert False -def test_factory_reset(nethsm): +def test_factory_reset(nethsm: NetHSM) -> None: """Perform a factory reset for a NetHSM instance. This command requires authentication as a user with the Administrator @@ -96,7 +96,7 @@ def test_factory_reset(nethsm): start_nethsm() -def test_state_restore(nethsm): +def test_state_restore(nethsm: NetHSM) -> None: """Restore a backup of a NetHSM instance from a file. If the system time is not set, the current system time is used.""" @@ -118,7 +118,7 @@ def test_state_restore(nethsm): encrypted = f.read() # see test_decrypt in test_nethsm_keys - with connect(C.OperatorUser) as nethsm: + with connect(C.OPERATOR_USER) as nethsm: decrypt = nethsm.decrypt( C.KEY_ID_GENERATED, base64.b64encode(encrypted).decode(), @@ -128,7 +128,7 @@ def test_state_restore(nethsm): assert base64.b64decode(decrypt).decode() == C.DATA -def test_state_provision_update(nethsm): +def test_state_provision_update(nethsm: NetHSM) -> None: """Load an update to a NetHSM instance. This command requires authentication as a user with the Administrator @@ -140,7 +140,7 @@ def test_state_provision_update(nethsm): update(nethsm) -def test_state_provision_update_cancel_update(nethsm): +def test_state_provision_update_cancel_update(nethsm: NetHSM) -> None: """Cancel a queued update on a NetHSM instance. This command requires authentication as a user with the Administrator @@ -153,7 +153,7 @@ def test_state_provision_update_cancel_update(nethsm): nethsm.cancel_update() -def test_update_commit_update(nethsm): +def test_update_commit_update(nethsm: NetHSM) -> None: """Commit a queued update on a NetHSM instance. This command requires authentication as a user with the Administrator @@ -166,7 +166,7 @@ def test_update_commit_update(nethsm): nethsm.commit_update() -def test_provision_reboot(nethsm): +def test_provision_reboot(nethsm: NetHSM) -> None: """Reboot a NetHSM instance. This command requires authentication as a user with the Administrator @@ -178,7 +178,7 @@ def test_provision_reboot(nethsm): nethsm.reboot() -def test_provision_shutdown(nethsm): +def test_provision_shutdown(nethsm: NetHSM) -> None: """Shutdown a NetHSM instance. This command requires authentication as a user with the Administrator diff --git a/tests/test_nethsm_users.py b/tests/test_nethsm_users.py index 6584735..f5a998a 100644 --- a/tests/test_nethsm_users.py +++ b/tests/test_nethsm_users.py @@ -1,9 +1,11 @@ import pytest from conftest import Constants as C +from conftest import UserData from utilities import nethsm # noqa: F401 from utilities import add_user, connect import nethsm as nethsm_module +from nethsm import NetHSM """######################### Preparation for the Tests ######################### @@ -14,7 +16,7 @@ """ -def add_users(nethsm): +def add_users(nethsm: NetHSM) -> None: """Create a new user on the NetHSM. Tests adding every Role. If the real name, role or passphrase are not specified, they have to be @@ -23,46 +25,46 @@ def add_users(nethsm): This command requires authentication as a user with the Administrator role.""" - add_user(nethsm, C.AdministratorUser) - add_user(nethsm, C.OperatorUser) - add_user(nethsm, C.MetricsUser) - add_user(nethsm, C.BackupUser) + add_user(nethsm, C.ADMINISTRATOR_USER) + add_user(nethsm, C.OPERATOR_USER) + add_user(nethsm, C.METRICS_USER) + add_user(nethsm, C.BACKUP_USER) -def delete_users_not_admin(nethsm): +def delete_users_not_admin(nethsm: NetHSM) -> None: user_ids = nethsm.list_users() if len(user_ids) > 1: for user_id in user_ids: - user = nethsm.get_user(user_id=user_id.value) + user = nethsm.get_user(user_id=user_id) if user.user_id != "admin": nethsm.delete_user(user.user_id) -def change_passphrase_with_admin(nethsm, user_id): +def change_passphrase_with_admin(nethsm: NetHSM, user_id: str) -> None: nethsm.set_passphrase(user_id, C.PASSPHRASE_CHANGED) -def login_user_get_state(username): +def login_user_get_state(username: UserData) -> None: with connect(username) as nethsm: nethsm.get_state() -def login_user_with_wrong_passphrase(user): +def login_user_with_wrong_passphrase(user: UserData) -> None: with connect(user) as nethsm: with pytest.raises(nethsm_module.NetHSMError): - nethsm.get_user(user_id=user.USER_ID) + nethsm.get_user(user_id=user.user_id) -def add_operator_tags(nethsm): - nethsm.add_operator_tag(user_id=C.OperatorUser.USER_ID, tag=C.TAG1) - nethsm.add_operator_tag(user_id=C.OperatorUser.USER_ID, tag=C.TAG2) - nethsm.add_operator_tag(user_id=C.OperatorUser.USER_ID, tag=C.TAG3) +def add_operator_tags(nethsm: NetHSM) -> None: + nethsm.add_operator_tag(user_id=C.OPERATOR_USER.user_id, tag=C.TAG1) + nethsm.add_operator_tag(user_id=C.OPERATOR_USER.user_id, tag=C.TAG2) + nethsm.add_operator_tag(user_id=C.OPERATOR_USER.user_id, tag=C.TAG3) """######################### Start of Tests #########################""" -def test_list_get_delete_add_users(nethsm): +def test_list_get_delete_add_users(nethsm: NetHSM) -> None: delete_users_not_admin(nethsm) add_users(nethsm) @@ -72,28 +74,28 @@ def test_list_get_delete_add_users(nethsm): user = nethsm.get_user(user_id=user_id) for i in range(len(remaining)): - if user.user_id == remaining[i].USER_ID: - assert user.real_name == remaining[i].REAL_NAME - assert user.role.value == remaining[i].ROLE + if user.user_id == remaining[i].user_id: + assert user.real_name == remaining[i].real_name + assert user.role.value == remaining[i].role remaining.pop(i) break assert remaining == [] -def test_get_user_admin(nethsm): +def test_get_user_admin(nethsm: NetHSM) -> None: """Query the real name and role for a user ID on the NetHSM. This command requires authentication as a user with the Administrator or Operator role.""" - user = nethsm.get_user(user_id=C.AdminUser.USER_ID) - assert user.user_id == C.AdminUser.USER_ID - assert user.real_name == C.AdminUser.REAL_NAME - assert user.role.value == C.AdminUser.ROLE + user = nethsm.get_user(user_id=C.ADMIN_USER.user_id) + assert user.user_id == C.ADMIN_USER.user_id + assert user.real_name == C.ADMIN_USER.real_name + assert user.role.value == C.ADMIN_USER.role # @pytest.mark.xfail(reason="connect() doesn't require correct passphrase yet") -def test_add_users_set_passphrases_connect(nethsm): +def test_add_users_set_passphrases_connect(nethsm: NetHSM) -> None: """Set the passphrase for the user with the given ID (or the current user). This command requires authentication as a user with the Administrator or @@ -106,88 +108,88 @@ def test_add_users_set_passphrases_connect(nethsm): add_users(nethsm) """Set with a Administrator User the passphrase of other users""" - change_passphrase_with_admin(nethsm, C.AdministratorUser.USER_ID) - change_passphrase_with_admin(nethsm, C.BackupUser.USER_ID) - change_passphrase_with_admin(nethsm, C.MetricsUser.USER_ID) - change_passphrase_with_admin(nethsm, C.OperatorUser.USER_ID) + change_passphrase_with_admin(nethsm, C.ADMINISTRATOR_USER.user_id) + change_passphrase_with_admin(nethsm, C.BACKUP_USER.user_id) + change_passphrase_with_admin(nethsm, C.METRICS_USER.user_id) + change_passphrase_with_admin(nethsm, C.OPERATOR_USER.user_id) """Login with every user with correct passphrase""" - login_user_get_state(C.AdministratorUser) - login_user_get_state(C.BackupUser) - login_user_get_state(C.MetricsUser) - login_user_get_state(C.OperatorUser) + login_user_get_state(C.ADMINISTRATOR_USER) + login_user_get_state(C.BACKUP_USER) + login_user_get_state(C.METRICS_USER) + login_user_get_state(C.OPERATOR_USER) """Login with every user with incorrect passphrase Works not as intended because of this test implementation""" - login_user_with_wrong_passphrase(C.AdministratorUser) - login_user_with_wrong_passphrase(C.BackupUser) - login_user_with_wrong_passphrase(C.MetricsUser) - login_user_with_wrong_passphrase(C.OperatorUser) + login_user_with_wrong_passphrase(C.ADMINISTRATOR_USER) + login_user_with_wrong_passphrase(C.BACKUP_USER) + login_user_with_wrong_passphrase(C.METRICS_USER) + login_user_with_wrong_passphrase(C.OPERATOR_USER) "Set with Operator user the passphrase of another user" - with connect(C.OperatorUser) as nethsm: + with connect(C.OPERATOR_USER) as nethsm: with pytest.raises(nethsm_module.NetHSMError): - nethsm.set_passphrase(C.MetricsUser.USER_ID, C.PASSPHRASE_CHANGED) + nethsm.set_passphrase(C.METRICS_USER.user_id, C.PASSPHRASE_CHANGED) "Set with another user which do not have Administrator or Operator Role" - with connect(C.BackupUser) as nethsm: + with connect(C.BACKUP_USER) as nethsm: with pytest.raises(nethsm_module.NetHSMError): - nethsm.set_passphrase(C.BackupUser.USER_ID, C.PASSPHRASE_CHANGED) + nethsm.set_passphrase(C.BACKUP_USER.user_id, C.PASSPHRASE_CHANGED) -def test_add_delete_user_administrator(nethsm): +def test_add_delete_user_administrator(nethsm: NetHSM) -> None: """Delete the user with the given user ID on the NetHSM. This command requires authentication as a user with the Administrator role.""" try: - add_user(nethsm, C.AdministratorUser) + add_user(nethsm, C.ADMINISTRATOR_USER) except nethsm_module.NetHSMError: pass - nethsm.delete_user(C.AdministratorUser.USER_ID) + nethsm.delete_user(C.ADMINISTRATOR_USER.user_id) with pytest.raises(nethsm_module.NetHSMError): - nethsm.get_user(user_id=C.AdministratorUser.USER_ID) + nethsm.get_user(user_id=C.ADMINISTRATOR_USER.user_id) -def test_add_operator_tags(nethsm): +def test_add_operator_tags(nethsm: NetHSM) -> None: """Add a tag for an operator user on the NetHSM. This command requires authentication as a user with the Administrator role.""" try: - add_user(nethsm, C.OperatorUser) + add_user(nethsm, C.OPERATOR_USER) except nethsm_module.NetHSMError: pass add_operator_tags(nethsm) -def test_add_list_operator_tags(nethsm): +def test_add_list_operator_tags(nethsm: NetHSM) -> None: """List the tags for an operator user ID on the NetHSM. This command requires authentication as a user with the Administrator role.""" try: - add_user(nethsm, C.OperatorUser) + add_user(nethsm, C.OPERATOR_USER) except nethsm_module.NetHSMError: pass try: add_operator_tags(nethsm) except nethsm_module.NetHSMError: pass - tags = nethsm.list_operator_tags(user_id=C.OperatorUser.USER_ID) + tags = nethsm.list_operator_tags(user_id=C.OPERATOR_USER.user_id) if tags: for tag in tags: assert str(tag) in C.TAGS -def test_add_delete_list_operator_tags(nethsm): +def test_add_delete_list_operator_tags(nethsm: NetHSM) -> None: """Delete a tag for an operator user on the NetHSM. This command requires authentication as a user with the Administrator role.""" try: - add_user(nethsm, C.OperatorUser) + add_user(nethsm, C.OPERATOR_USER) except nethsm_module.NetHSMError: pass try: @@ -195,7 +197,7 @@ def test_add_delete_list_operator_tags(nethsm): except nethsm_module.NetHSMError: pass - nethsm.delete_operator_tag(user_id=C.OperatorUser.USER_ID, tag=C.TAG1) - nethsm.delete_operator_tag(user_id=C.OperatorUser.USER_ID, tag=C.TAG2) - tag = nethsm.list_operator_tags(user_id=C.OperatorUser.USER_ID) + nethsm.delete_operator_tag(user_id=C.OPERATOR_USER.user_id, tag=C.TAG1) + nethsm.delete_operator_tag(user_id=C.OPERATOR_USER.user_id, tag=C.TAG2) + tag = nethsm.list_operator_tags(user_id=C.OPERATOR_USER.user_id) assert C.TAG1 not in tag and C.TAG2 not in tag diff --git a/tests/utilities.py b/tests/utilities.py index e944dd0..07e47ae 100644 --- a/tests/utilities.py +++ b/tests/utilities.py @@ -4,11 +4,13 @@ import os import subprocess from time import sleep +from typing import Iterator -import docker +import docker # type: ignore import pytest import urllib3 from conftest import Constants as C +from conftest import UserData from Crypto.Cipher import PKCS1_v1_5 as PKCS115_Cipher from Crypto.Hash import SHA256 from Crypto.PublicKey import RSA @@ -20,10 +22,11 @@ from cryptography.x509.oid import NameOID import nethsm as nethsm_module +from nethsm import NetHSM @pytest.fixture(scope="module") -def nethsm(): +def nethsm() -> Iterator[NetHSM]: """Start Docker container with Nethsm image and connect to Nethsm This Pytest Fixture will run before the tests to provide the tests with @@ -32,7 +35,7 @@ def nethsm(): container = start_nethsm() - with connect(C.AdminUser) as nethsm: + with connect(C.ADMIN_USER) as nethsm: provision(nethsm) yield nethsm @@ -43,15 +46,15 @@ def nethsm(): class KeyfenderManager: - def __init__(): - ... + def __init__(self) -> None: + pass - def kill(): - ... + def kill(self) -> None: + pass class KeyfenderDockerManager(KeyfenderManager): - def __init__(self): + def __init__(self) -> None: client = docker.from_env() while True: @@ -82,7 +85,7 @@ def __init__(self): ) self.container = container - def kill(self): + def kill(self) -> None: try: self.container.kill() except docker.errors.APIError: @@ -90,7 +93,7 @@ def kill(self): class KeyfenderCIManager(KeyfenderManager): - def __init__(self): + def __init__(self) -> None: os.system("pkill keyfender.unix") os.system("pkill etcd") @@ -107,12 +110,12 @@ def __init__(self): ] ) - def kill(self): + def kill(self) -> None: self.process.kill() -def start_nethsm(): - +def start_nethsm() -> KeyfenderManager: + context: KeyfenderManager if C.TEST_MODE == "docker": context = KeyfenderDockerManager() elif C.TEST_MODE == "ci": @@ -137,14 +140,14 @@ def start_nethsm(): @contextlib.contextmanager -def connect(username): +def connect(user: UserData) -> Iterator[NetHSM]: with nethsm_module.connect( - C.HOST, C.VERSION, username.USER_ID, C.PASSWORD, C.VERIFY_TLS + C.HOST, C.VERSION, user.user_id, C.PASSWORD, C.VERIFY_TLS ) as nethsm_out: yield nethsm_out -def provision(nethsm): +def provision(nethsm: NetHSM) -> None: """Initial provisioning of a NetHSM. If unlock or admin passphrases are not set, they have to be entered @@ -155,7 +158,7 @@ def provision(nethsm): nethsm.provision("unlockunlock", "adminadmin", system_time) -def add_user(nethsm, username): +def add_user(nethsm: NetHSM, user: UserData) -> None: """Create a new user on the NetHSM. If the real name, role or passphrase are not specified, they have to be @@ -165,50 +168,50 @@ def add_user(nethsm, username): This command requires authentication as a user with the Administrator role.""" try: - nethsm.get_user(user_id=username.USER_ID) + nethsm.get_user(user_id=user.user_id) except nethsm_module.NetHSMError: - nethsm.add_user( - username.REAL_NAME, username.ROLE, C.PASSPHRASE, username.USER_ID - ) + nethsm.add_user(user.real_name, user.role, C.PASSPHRASE, user.user_id) -def generate_rsa_key_pair(length_in_bit): +def generate_rsa_key_pair(length_in_bit: int) -> tuple[str, str, str]: key_pair = RSA.generate(length_in_bit) length_in_byte = int(length_in_bit / 8) # "big" byteorder is needed, it's the dominant order in networking p = base64.b64encode(key_pair.p.to_bytes(length_in_byte, "big")) q = base64.b64encode(key_pair.q.to_bytes(length_in_byte, "big")) e = base64.b64encode(key_pair.e.to_bytes(length_in_byte, "big")) - p = str(p, "utf-8").strip() - q = str(q, "utf-8").strip() - e = str(e, "utf-8").strip() - return p, q, e + ps = str(p, "utf-8").strip() + qs = str(q, "utf-8").strip() + es = str(e, "utf-8").strip() + return ps, qs, es -def verify_rsa_signature(public_key: str, message: SHA256.SHA256Hash, signature: bytes): +def verify_rsa_signature( + public_key: str, message: SHA256.SHA256Hash, signature: bytes +) -> bool: key = RSA.importKey(public_key) return PKCS1_PSS.new(key).verify(message, signature) -def encrypt_rsa(public_key: str, message: str): - public_key = RSA.importKey(public_key) - cipher = PKCS115_Cipher.new(public_key) +def encrypt_rsa(public_key: str, message: str) -> bytes: + key = RSA.importKey(public_key) + cipher = PKCS115_Cipher.new(key) return cipher.encrypt(bytes(message, "utf-8")) -def lock(nethsm): +def lock(nethsm: NetHSM) -> None: if nethsm.get_state().value == "Operational": nethsm.lock() assert nethsm.get_state().value == "Locked" -def unlock(nethsm, unlock_passphrase): +def unlock(nethsm: NetHSM, unlock_passphrase: str) -> None: if nethsm.get_state().value == "Locked": nethsm.unlock(unlock_passphrase) assert nethsm.get_state().value == "Operational" -def set_backup_passphrase(nethsm): +def set_backup_passphrase(nethsm: NetHSM) -> None: """Set the backup passphrase of a NetHSM. This command requires authentication as a user with the Administrator @@ -217,7 +220,7 @@ def set_backup_passphrase(nethsm): nethsm.set_backup_passphrase(C.BACKUP_PASSPHRASE) -def update(nethsm): +def update(nethsm: NetHSM) -> None: """Load an update to a NetHSM instance. This command requires authentication as a user with the Administrator @@ -234,7 +237,7 @@ def update(nethsm): assert False -def self_sign_csr(csr: str): +def self_sign_csr(csr: str) -> bytes: # Generate a private key private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) parsed_csr = x509.load_pem_x509_csr(csr.encode("utf-8")) @@ -256,7 +259,7 @@ def self_sign_csr(csr: str): x509.SubjectKeyIdentifier.from_public_key(public_key), critical=False ) .add_extension( - x509.AuthorityKeyIdentifier.from_issuer_public_key(public_key), + x509.AuthorityKeyIdentifier.from_issuer_public_key(public_key), # type: ignore critical=False, ) .sign(private_key, hashes.SHA256())