Skip to content

Commit

Permalink
refact, test: initial work on reworking signing, add set expiration d…
Browse files Browse the repository at this point in the history
…ate test
  • Loading branch information
renatav committed Nov 8, 2024
1 parent abeaa2c commit f3bc1f7
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 200 deletions.
11 changes: 6 additions & 5 deletions taf/api/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from logdecorator import log_on_end, log_on_error
from taf.api.utils._git import check_if_clean
from taf.exceptions import TAFError
from taf.keys import load_signing_keys
from taf.keys import load_signers, load_signing_keys
from taf.constants import DEFAULT_RSA_SIGNATURE_SCHEME
from taf.messages import git_commit_message
from taf.repository_tool import Repository, is_delegated_role
Expand Down Expand Up @@ -118,6 +118,7 @@ def update_metadata_expiration_date(
Returns:
None
"""
# TODO move tis to the auth repo class
if start_date is None:
start_date = datetime.now()

Expand Down Expand Up @@ -177,7 +178,7 @@ def _update_expiration_date_of_role(
scheme: str,
prompt_for_keys: bool,
) -> None:
keys, yubikeys = load_signing_keys(
keystore_signers, yubikeys = load_signers(
auth_repo,
role,
loaded_yubikeys=loaded_yubikeys,
Expand All @@ -186,9 +187,9 @@ def _update_expiration_date_of_role(
prompt_for_keys=prompt_for_keys,
)
# sign with keystore
if len(keys):
auth_repo.update_role_keystores(
role, keys, start_date=start_date, interval=interval
if len(keystore_signers):
auth_repo.set_metadata_expiration_date(
role, keystore_signers, start_date=start_date, interval=interval
)
if len(yubikeys): # sign with yubikey
auth_repo.update_role_yubikeys(
Expand Down
34 changes: 20 additions & 14 deletions taf/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from taf.models.types import TargetsRole, MainRoles, UserKeyData
from taf.repository_tool import Repository
from taf.api.utils._conf import find_keystore
from taf.tuf.keys import load_signer_from_pem
from tuf.repository_tool import (
generate_and_write_unencrypted_rsa_keypair,
generate_and_write_rsa_keypair,
Expand All @@ -24,7 +25,7 @@
from taf.keystore import (
get_keystore_keys_of_role,
key_cmd_prompt,
read_private_key_from_keystore,
load_signer_from_private_keystore,
read_public_key_from_keystore,
)
from taf import YubikeyMissingLibrary
Expand All @@ -34,6 +35,9 @@
import_rsa_publickey_from_file,
)

from securesystemslib.signer._crypto_signer import CryptoSigner


try:
import taf.yubikey as yk
except ImportError:
Expand Down Expand Up @@ -193,31 +197,31 @@ def _generate_public_key_from_private(keystore_path, key_name, scheme):
return None


def _load_from_keystore(
def _load_signer_from_keystore(
taf_repo, keystore_path, key_name, num_of_signatures, scheme, role
):
) -> CryptoSigner:
if keystore_path is None:
return None
if (keystore_path / key_name).is_file():
try:
key = read_private_key_from_keystore(
keystore_path, key_name, num_of_signatures, scheme
signer = load_signer_from_private_keystore(
keystore=keystore_path, key_name=key_name, scheme=scheme
)
# load only valid keys
if taf_repo.is_valid_metadata_key(role, key, scheme=scheme):
if taf_repo.is_valid_metadata_key(role, signer._private_key, scheme=scheme):
# Check if the public key is missing and generate it if necessary
public_key_path = keystore_path / f"{key_name}.pub"
if not public_key_path.exists():
_generate_public_key_from_private(keystore_path, key_name, scheme)
return key
return signer
except KeystoreError:
pass

return None


@log_on_start(INFO, "Loading signing keys of '{role:s}'", logger=taf_logger)
def load_signing_keys(
def load_signers(
taf_repo: Repository,
role: str,
loaded_yubikeys: Optional[Dict],
Expand All @@ -234,7 +238,7 @@ def load_signing_keys(
signing_keys_num = len(taf_repo.get_role_keys(role))
all_loaded = False
num_of_signatures = 0
keys = []
signers_keystore = []
yubikeys = []

# first try to sign using yubikey
Expand Down Expand Up @@ -279,11 +283,11 @@ def _load_and_append_yubikeys(
# there is no need to ask the user if they want to load more key, try to load from keystore
if num_of_signatures < len(keystore_files):
key_name = keystore_files[num_of_signatures]
key = _load_from_keystore(
signer = _load_signer_from_keystore(
taf_repo, keystore_path, key_name, num_of_signatures, scheme, role
)
if key is not None:
keys.append(key)
if signer is not None:
signers_keystore.append(key)
num_of_signatures += 1
continue
if num_of_signatures >= threshold:
Expand Down Expand Up @@ -313,13 +317,15 @@ def _load_and_append_yubikeys(
continue

if prompt_for_keys and click.confirm(f"Manually enter {role} key?"):
keys = [signer.public_key for signer in signers_keystore]
key = key_cmd_prompt(key_name, role, taf_repo, keys, scheme)
keys.append(key)
signer = load_signer_from_pem(key)
signers_keystore.append(key)
num_of_signatures += 1
else:
raise SigningError(f"Cannot load keys of role {role}")

return keys, yubikeys
return signers_keystore, yubikeys


def setup_roles_keys(
Expand Down
30 changes: 13 additions & 17 deletions taf/keystore.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@
import click
import securesystemslib
from securesystemslib.interface import (
import_rsa_privatekey_from_file,
import_rsa_publickey_from_file,
)
from taf.repository_tool import Repository
from taf.tuf.keys import load_signer_from_file
from tuf.repository_tool import import_rsakey_from_pem

from taf.constants import DEFAULT_RSA_SIGNATURE_SCHEME
from taf.exceptions import KeystoreError

from securesystemslib.securesystemslib.signer._crypto_signer import CryptoSigner


def default_keystore_path() -> str:
keystore_path = str(Path(getcwd(), "keystore"))
Expand Down Expand Up @@ -63,15 +65,15 @@ def _enter_and_check_key(key_name, role, loaded_keys, scheme):
if not taf_repo.is_valid_metadata_yubikey(role, public_key):
print(f"The entered key is not a valid {role} key")
return None
if loaded_keys is not None and key in loaded_keys:
if loaded_keys is not None and public_key in loaded_keys:
print("Key already entered")
return None
return key
return pem

while True:
key = _enter_and_check_key(key_name, role, loaded_keys, scheme)
if key is not None:
return key
pem = _enter_and_check_key(key_name, role, loaded_keys, scheme)
if pem is not None:
return pem


def load_tuf_private_key(
Expand Down Expand Up @@ -103,29 +105,23 @@ def _enter_and_check_key(scheme):
return key


def read_private_key_from_keystore(
def load_signer_from_private_keystore(
keystore: str,
key_name: str,
key_num: Optional[int] = None,
scheme: Optional[str] = DEFAULT_RSA_SIGNATURE_SCHEME,
password: Optional[str] = None,
) -> Dict:
) -> CryptoSigner:
key_path = Path(keystore, key_name).expanduser().resolve()
if not key_path.is_file():
raise KeystoreError(f"{str(key_path)} does not exist")

def _read_key(path, password, scheme):
def _read_key_or_keystore_error(path, password, scheme):
try:
return import_rsa_privatekey_from_file(
str(Path(keystore, key_name)), password or None, scheme=scheme
return load_signer_from_file(
path, password or None, scheme=scheme
)
except (
securesystemslib.exceptions.FormatError,
securesystemslib.exceptions.Error,
) as e:
if "password" in str(e).lower():
return None
except Exception as e:
raise KeystoreError(e)

try:
Expand Down
Loading

0 comments on commit f3bc1f7

Please sign in to comment.