Skip to content

Commit

Permalink
fix, refact: init conf fixes, rework add multiple roles
Browse files Browse the repository at this point in the history
  • Loading branch information
renatav committed Oct 11, 2024
1 parent 904c26a commit 1187f5b
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 128 deletions.
91 changes: 49 additions & 42 deletions taf/api/conf.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from shutil import Error, copytree
import shutil
from typing import Optional
from pathlib import Path
from taf.api.keystore import generate_keys
from taf.log import taf_logger
from taf.utils import read_input_dict


def init(
Expand Down Expand Up @@ -32,7 +34,13 @@ def init(
keystore_directory.mkdir(exist_ok=True)

# If any of these parameters exist you can assume the user wants to generate keys
if not keystore and not roles_key_infos:

# check if keystore already exists
roles_key_infos_dict = read_input_dict(roles_key_infos)
keystore = keystore or (roles_key_infos and roles_key_infos_dict.get("keystore"))
should_generate_keys = False
keystore_path = Path(keystore) if keystore else None
if not keystore or not keystore_path.is_dir():
# Prompt the user if they want to run the generate_keys function
while True:
use_keystore = (
Expand All @@ -43,50 +51,49 @@ def init(
if use_keystore in ["y", "n"]:
should_generate_keys = use_keystore == "y"
break
if should_generate_keys or (keystore and not roles_key_infos):
# First check if the user already specified keystore
if not keystore:
copy_keystore = (
input(
"Do you want to load an existing keystore from another location? [y/N]: "
)
.strip()
.lower()
)
if copy_keystore == "y":
while True:
keystore_input = input(
"Enter the path to the existing keystore:"
).strip()
keystore_path = Path(keystore_input)
if keystore_path.exists() and keystore_path.is_dir():
keystore = keystore_input # Assign the string path to the keystore variable
break
else:
taf_logger.error(
f"Provided keystore path {keystore} is invalid."
)
# Check if keystore is specified now. If so copy the keys
if keystore:
try:
copytree(keystore, keystore_directory, dirs_exist_ok=True)
taf_logger.info(
f"Copied keystore from {keystore} to {keystore_directory}"

if should_generate_keys:
# First check if the user already specified keystore
if not keystore:
copy_keystore = (
input(
"Do you want to load an existing keystore from another location? [y/N]: "
)
.strip()
.lower()
)
except FileNotFoundError:
taf_logger.error(f"Provided keystore path {keystore} not found.")
except Error as e:
taf_logger.error(f"Error occurred while copying keystore: {e}")
if copy_keystore == "y":
while True:
keystore_input = input(
"Enter the path to the existing keystore:"
).strip()
keystore_path = Path(keystore_input)
if keystore_path.exists() and keystore_path.is_dir():
keystore = keystore_input # Assign the string path to the keystore variable
should_generate_keys = (
False # no need to generate keys, they will be copied
)
break
else:
taf_logger.error(
f"Provided keystore path {keystore} is invalid."
)
# Check if keystore is specified now. If so copy the keys
if keystore and keystore_path.is_dir():
try:
copytree(keystore, keystore_directory, dirs_exist_ok=True)
taf_logger.info(f"Copied keystore from {keystore} to {keystore_directory}")
except FileNotFoundError:
taf_logger.error(f"Provided keystore path {keystore} not found.")
except Error as e:
taf_logger.error(f"Error occurred while copying keystore: {e}")

# If there is no keystore path specified, ask for keys description and generate keys
elif not roles_key_infos:
roles_key_infos = input(
"Enter the path to the keys description JSON file (can be left empty): "
).strip()
if not roles_key_infos:
roles_key_infos = "."
if roles_key_infos:
if should_generate_keys:
generate_keys(keystore_directory, roles_key_infos)
taf_logger.info(
f"Successfully generated keys inside the {keystore_directory} directory"
)

if roles_key_infos is not None and Path(roles_key_infos).is_file():
infos_config_path = (taf_directory / Path(roles_key_infos).name).absolute()
shutil.copy(str(roles_key_infos), str(infos_config_path))
57 changes: 36 additions & 21 deletions taf/api/roles.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
from taf.api.utils._git import check_if_clean, commit_and_push
from taf.exceptions import KeystoreError, TAFError
from taf.models.converter import from_dict
from taf.models.types import RolesIterator, TargetsRole
from taf.models.types import RolesIterator, TargetsRole, compare_roles_data
from taf.repositoriesdb import REPOSITORIES_JSON_PATH
from tuf.repository_tool import TARGETS_DIRECTORY_NAME
import tuf.roledb
import taf.repositoriesdb as repositoriesdb
from taf.keys import (
find_keystore,
get_key_name,
get_metadata_key_info,
load_signing_keys,
Expand Down Expand Up @@ -216,7 +217,7 @@ def add_role_paths(
reraise=True,
)
@check_if_clean
def add_roles(
def add_multiple_roles(
path: str,
keystore: Optional[str] = None,
roles_key_infos: Optional[str] = None,
Expand All @@ -227,7 +228,8 @@ def add_roles(
) -> None:
"""
Add new target roles and sign all metadata files given information stored in roles_key_infos
dictionary or .json file
dictionary or .json file.
Arguments:
path: Path to the authentication repository.
Expand All @@ -246,22 +248,13 @@ def add_roles(
"""
auth_repo = AuthenticationRepository(path=path)

roles_key_infos_dict, keystore, _ = _initialize_roles_and_keystore(
roles_key_infos, keystore
roles_keys_data_new = _initialize_roles_and_keystore_for_existing_repo(
path, roles_key_infos, keystore
)
roles_data = auth_repo.generate_roles_description()
roles_keys_data_current = from_dict(roles_data, RolesKeysData)

roles_keys_data = from_dict(roles_key_infos_dict, RolesKeysData)

new_roles = []
existing_roles = auth_repo.get_all_targets_roles()
main_roles = ["root", "snapshot", "timestamp"]
existing_roles.extend(main_roles)

new_roles = [
role
for role in RolesIterator(roles_keys_data.roles.targets, skip_top_role=True)
if role.name not in existing_roles
]
new_roles, _ = compare_roles_data(roles_keys_data_current, roles_keys_data_new)

parent_roles_names = {role.parent.name for role in new_roles}

Expand All @@ -270,20 +263,23 @@ def add_roles(
return

repository = auth_repo._repository
existing_roles = [
role.name for role in RolesIterator(roles_keys_data_current.roles)
]
signing_keys, verification_keys = load_sorted_keys_of_new_roles(
auth_repo=auth_repo,
roles=roles_keys_data.roles,
roles=roles_keys_data_new.roles,
keystore=keystore,
yubikeys_data=roles_keys_data.yubikeys,
yubikeys_data=roles_keys_data_new.yubikeys,
existing_roles=existing_roles,
)

create_delegations(
roles_keys_data.roles.targets,
roles_keys_data_new.roles.targets,
repository,
verification_keys,
signing_keys,
existing_roles,
existing_roles=existing_roles,
)
for parent_role_name in parent_roles_names:
_update_role(
Expand Down Expand Up @@ -541,6 +537,24 @@ def _read_val(input_type, name, param=None, required=False):
pass


def _initialize_roles_and_keystore_for_existing_repo(
path: str,
roles_key_infos: Optional[str],
keystore: Optional[str],
enter_info: Optional[bool] = True,
) -> RolesKeysData:
roles_key_infos_dict = read_input_dict(roles_key_infos)

if not roles_key_infos_dict and enter_info:
roles_key_infos_dict = _enter_roles_infos(None, roles_key_infos)
roles_keys_data = from_dict(roles_key_infos_dict, RolesKeysData)
keystore = keystore or roles_keys_data.keystore
if keystore is None:
keystore = find_keystore(path)
roles_keys_data.keystore = keystore
return roles_keys_data


def _initialize_roles_and_keystore(
roles_key_infos: Optional[str],
keystore: Optional[str],
Expand All @@ -562,6 +576,7 @@ def _initialize_roles_and_keystore(
enter_info (optional): Indicates if the user should be asked to enter information about the
roles and keys if not specified. Set to True by default.
Side Effects:
None
Expand Down
23 changes: 23 additions & 0 deletions taf/models/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,26 @@ def _dfs_delegations(role, skip_top_role=False):

for role in roles:
yield from _dfs_delegations(role, self.skip_top_role)


def compare_roles_data(old_data: RolesKeysData, new_data: RolesKeysData):
added_roles = []
removed_roles = []
current_roles = {
role.name: role
for role in RolesIterator(old_data.roles.targets, skip_top_role=True)
}
new_roles = {
role.name: role
for role in RolesIterator(new_data.roles.targets, skip_top_role=True)
}

for role_name, role in current_roles.items():
if role_name not in new_roles:
removed_roles.append(role)

for role_name, role in new_roles.items():
if role_name not in current_roles:
added_roles.append(role)

return added_roles, removed_roles
75 changes: 10 additions & 65 deletions taf/tools/roles/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from pathlib import Path
import sys
import click
from taf.api.roles import add_role, add_roles, list_keys_of_role, add_signing_key
from taf.api.roles import add_multiple_roles, add_role, list_keys_of_role, add_signing_key
from taf.constants import DEFAULT_RSA_SIGNATURE_SCHEME
from taf.exceptions import TAFError
from taf.auth_repo import AuthenticationRepository
Expand Down Expand Up @@ -106,13 +106,12 @@ def export_roles_description(path, output, keystore):
return export_roles_description


def update_roles_command():
@click.command(help="""Add or update roles based on the provided keys-description file.
This file is expected to contain information about all roles this command updates
the repository based on the discrepencies between information listed in that file
and the current metadata.
def add_multiple_command():
@click.command(help="""Adds new roles based on the provided keys-description file by
comparing it with the current state of the repository.
The current state can be exported using taf roles export_roles_description
The current state can be exported using taf roles export_roles_description and then
edited manually to add new roles.
For each role, the following can be defined:
- Total number of keys per role.
Expand All @@ -121,10 +120,6 @@ def update_roles_command():
- Signature scheme, with the default being 'rsa-pkcs1v15-sha256'.
- Keystore path, if not specified via the keystore option.
This command facilitates the addition of new roles or the updating of existing roles according to the provided specifications.
New roles are automatically detected and integrated. Currently, the removal of roles is not supported.
It is possible to add new delegated paths and update other properties of the roles.
\b
Example of a JSON configuration:
{
Expand Down Expand Up @@ -153,16 +148,16 @@ def update_roles_command():
@click.option("--scheme", default=DEFAULT_RSA_SIGNATURE_SCHEME, help="A signature scheme used for signing")
@click.option("--no-commit", is_flag=True, default=False, help="Indicates that the changes should not be committed automatically")
@click.option("--prompt-for-keys", is_flag=True, default=False, help="Whether to ask the user to enter their key if not located inside the keystore directory")
def update_roles(path, keystore, keys_description, scheme, no_commit, prompt_for_keys):
add_roles(
def add_multiple(path, keystore, keys_description, scheme, no_commit, prompt_for_keys):
add_multiple_roles(
path=path,
keystore=keystore,
roles_key_infos=keys_description,
scheme=scheme,
prompt_for_keys=prompt_for_keys,
commit=not no_commit,
)
return update_roles
return add_multiple


def add_role_paths_command():
Expand Down Expand Up @@ -192,56 +187,6 @@ def adding_role_paths(role, path, delegated_path, keystore, no_commit, prompt_fo
return adding_role_paths


def add_multiple_roles_command():
@click.command(help="""Add one or more target roles. Information about the roles
can be provided through a dictionary - either specified directly or contained
by a .json file whose path is specified when calling this command. This allows
definition of:
- total number of keys per role
- threshold of signatures per role
- should keys of a role be on Yubikeys or should keystore files be used
- scheme (the default scheme is rsa-pkcs1v15-sha256)
- keystore path, if not specified via keystore option
\b
For example:
{
"roles": {
"root": {
"number": 3,
"length": 2048,
"passwords": ["password1", "password2", "password3"],
"threshold": 2,
"yubikey": true
},
"targets": {
"length": 2048
},
"snapshot": {},
"timestamp": {}
},
"keystore": "keystore_path"
}
""")
@find_repository
@catch_cli_exception(handle=TAFError)
@click.option("--path", default=".", help="Authentication repository's location. If not specified, set to the current directory")
@click.argument("keys-description")
@click.option("--keystore", default=None, help="Location of the keystore files")
@click.option("--scheme", default=DEFAULT_RSA_SIGNATURE_SCHEME, help="A signature scheme used for signing")
@click.option("--no-commit", is_flag=True, default=False, help="Indicates that the changes should not be committed automatically")
@click.option("--prompt-for-keys", is_flag=True, default=False, help="Whether to ask the user to enter their key if not located inside the keystore directory")
def add_multiple(path, keystore, keys_description, scheme, no_commit, prompt_for_keys):
add_roles(
path=path,
keystore=keystore,
roles_key_infos=keys_description,
scheme=scheme,
prompt_for_keys=prompt_for_keys,
commit=not no_commit,
)
return add_multiple


# commenting out this command since its execution leads to an invalid state
# this is a TUF bug (or better said, caused by using a newer version of the updater and old repository_tool)
Expand Down Expand Up @@ -333,7 +278,7 @@ def list_keys(role, path):
def attach_to_group(group):

group.add_command(add_role_command(), name='add')
group.add_command(add_multiple_roles_command(), name='add-multiple')
group.add_command(add_multiple_command(), name='add-multiple')
group.add_command(add_role_paths_command(), name='add-role-paths')
# group.add_command(remove_role_command(), name='remove')
group.add_command(add_signing_key_command(), name='add-signing-key')
Expand Down

0 comments on commit 1187f5b

Please sign in to comment.