From 1f5c811d0638559afa98adc438fa32ab13cb6c36 Mon Sep 17 00:00:00 2001 From: Vignesh Rao Date: Mon, 16 Sep 2024 10:01:15 -0500 Subject: [PATCH] Encrypt secrets when stored in database Include a complexity checker for API key and secret --- .gitignore | 5 ++- decryptor.py | 28 -------------- encryptor.py | 4 -- pyproject.toml | 2 +- vaultapi/main.py | 3 +- vaultapi/models.py | 93 +++++++++++++++++++++++++++++++++++++++++++-- vaultapi/payload.py | 2 +- vaultapi/routers.py | 12 ++++-- 8 files changed, 106 insertions(+), 43 deletions(-) delete mode 100644 decryptor.py delete mode 100644 encryptor.py diff --git a/.gitignore b/.gitignore index 2728d12..45dc406 100644 --- a/.gitignore +++ b/.gitignore @@ -161,6 +161,7 @@ cython_debug/ # option (not recommended) you can uncomment the following to ignore the entire idea folder. .idea/ -*.env -*.json +.vscode + +temp.py *.db diff --git a/decryptor.py b/decryptor.py deleted file mode 100644 index 055436f..0000000 --- a/decryptor.py +++ /dev/null @@ -1,28 +0,0 @@ -import requests -from cryptography.fernet import Fernet - -from vaultapi.models import session -from vaultapi.squire import load_env - -env = load_env() -session.fernet = Fernet(env.secret) - -BASE_URL = f"http://{env.host}:{env.port}" -assert requests.get(f"{BASE_URL}/health").status_code == 200 - - -def get_secret(filename: str, filepath: str): - """Get secret file contents from the API. - - Args: - filename: Filename to source secrets. - filepath: Parent directory path for the secrets file. - """ - get_url = f"{BASE_URL}/get-secret" - response = requests.get( - url=get_url, - params={"filename": filename, "filepath": filepath}, - headers={"Authorization": f"Bearer {env.apikey}"}, - ) - encrypted = response.json().get("detail") - print(session.fernet.decrypt(encrypted).decode()) diff --git a/encryptor.py b/encryptor.py deleted file mode 100644 index 2417cee..0000000 --- a/encryptor.py +++ /dev/null @@ -1,4 +0,0 @@ -import vaultapi - -if __name__ == "__main__": - vaultapi.start() diff --git a/pyproject.toml b/pyproject.toml index c113497..78b60ba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,7 @@ requires-python = ">=3.10" packages = ["vaultapi"] [tool.setuptools.dynamic] -version = {attr = "vaultapi.version"} +version = {attr = "vaultapi.version.__version__"} dependencies = { file = ["requirements.txt"] } [project.optional-dependencies] diff --git a/vaultapi/main.py b/vaultapi/main.py index 9328e09..e90e149 100644 --- a/vaultapi/main.py +++ b/vaultapi/main.py @@ -2,6 +2,7 @@ import pathlib import uvicorn +from cryptography.fernet import Fernet from fastapi import FastAPI from . import models, routers, squire, version @@ -29,7 +30,7 @@ def start(**kwargs) -> None: log_config: Logging configuration as a dict or a FilePath. Supports .yaml/.yml, .json or .ini formats. """ models.env = squire.load_env(**kwargs) - # models.session.fernet = Fernet(models.env.secret) + models.session.fernet = Fernet(models.env.secret) models.database = models.Database(models.env.database) models.database.create_table("default", ["key", "value"]) module_name = pathlib.Path(__file__) diff --git a/vaultapi/models.py b/vaultapi/models.py index 5e5f872..0eca6c1 100644 --- a/vaultapi/models.py +++ b/vaultapi/models.py @@ -1,13 +1,62 @@ import pathlib +import re import socket import sqlite3 from typing import Any, Dict, List, Set, Tuple from cryptography.fernet import Fernet -from pydantic import BaseModel, Field, FilePath, PositiveInt +from pydantic import BaseModel, Field, FilePath, NewPath, PositiveInt, field_validator from pydantic_settings import BaseSettings +def complexity_checker(secret: str, simple: bool = False) -> None: + """Verifies the strength of a secret. + + Args: + secret: Value of the secret. + simple: Boolean flag to increase complexity. + + See Also: + A secret is considered strong if it at least has: + + - 32 characters + - 1 digit + - 1 symbol + - 1 uppercase letter + - 1 lowercase letter + + Raises: + AssertionError: When at least 1 of the above conditions fail to match. + """ + char_limit = 8 if simple else 32 + + # calculates the length + assert ( + len(secret) >= char_limit + ), f"Minimum secret length is {char_limit}, received {len(secret)}" + + # searches for digits + assert re.search(r"\d", secret), "secret must include an integer" + + if simple: + return + + # searches for uppercase + assert re.search( + r"[A-Z]", secret + ), "secret must include at least one uppercase letter" + + # searches for lowercase + assert re.search( + r"[a-z]", secret + ), "secret must include at least one lowercase letter" + + # searches for symbols + assert re.search( + r"[ !#$%&'()*+,-./[\\\]^_`{|}~" + r'"]', secret + ), "secret must contain at least one special character" + + class Database: """Creates a connection and instantiates the cursor. @@ -81,8 +130,8 @@ class EnvConfig(BaseSettings): """ apikey: str - # secret: str - database: str = Field("secrets.db", pattern=".*.db$") + secret: str + database: FilePath | NewPath | str = Field("secrets.db", pattern=".*.db$") host: str = socket.gethostbyname("localhost") or "0.0.0.0" port: PositiveInt = 8080 workers: PositiveInt = 1 @@ -90,6 +139,44 @@ class EnvConfig(BaseSettings): allowed_origins: List[str] = [] rate_limit: RateLimit | List[RateLimit] = [] + # noinspection PyMethodParameters + @field_validator("apikey", mode="after") + def parse_apikey(cls, value: str | None) -> str | None: + """Parse API key to validate complexity. + + Args: + value: Takes the user input as an argument. + + Returns: + str: + Returns the parsed value. + """ + if value: + try: + complexity_checker(value, True) + except AssertionError as error: + raise ValueError(error.__str__()) + return value + + # noinspection PyMethodParameters + @field_validator("secret", mode="after") + def parse_api_secret(cls, value: str | None) -> str | None: + """Parse API secret to validate complexity. + + Args: + value: Takes the user input as an argument. + + Returns: + str: + Returns the parsed value. + """ + if value: + try: + complexity_checker(value) + except AssertionError as error: + raise ValueError(error.__str__()) + return value + @classmethod def from_env_file(cls, env_file: pathlib.Path) -> "EnvConfig": """Create Settings instance from environment file. diff --git a/vaultapi/payload.py b/vaultapi/payload.py index 0d6a1b0..a233a51 100644 --- a/vaultapi/payload.py +++ b/vaultapi/payload.py @@ -1,4 +1,4 @@ -from pydantic import BaseModel, FilePath, DirectoryPath +from pydantic import BaseModel class DeleteSecret(BaseModel): diff --git a/vaultapi/routers.py b/vaultapi/routers.py index 9f005c7..08d4992 100644 --- a/vaultapi/routers.py +++ b/vaultapi/routers.py @@ -57,7 +57,8 @@ async def get_secret( await auth.validate(request, apikey) if value := await retrieve_existing(key, table_name): LOGGER.info("Secret value for '%s' was retrieved", key) - raise exceptions.APIResponse(status_code=HTTPStatus.OK.real, detail=value) + decrypted = models.session.fernet.decrypt(value).decode(encoding="UTF-8") + raise exceptions.APIResponse(status_code=HTTPStatus.OK.real, detail=decrypted) LOGGER.info("Secret value for '%s' NOT found in the datastore", key) raise exceptions.APIResponse( status_code=HTTPStatus.NOT_FOUND.real, detail=HTTPStatus.NOT_FOUND.phrase @@ -86,8 +87,13 @@ async def put_secret( if await retrieve_existing(data.key, data.table_name): LOGGER.info("Secret value for '%s' will be overridden", data.key) else: - LOGGER.info("Storing a secret value for '%s' in the datastore", data.key) - database.put_secret(key=data.key, value=data.value, table_name=data.table_name) + LOGGER.info( + "Storing a secret value for '%s' to the table '%s' in the datastore", + data.key, + data.table_name, + ) + encrypted = models.session.fernet.encrypt(data.value.encode(encoding="UTF-8")) + database.put_secret(key=data.key, value=encrypted, table_name=data.table_name) raise exceptions.APIResponse( status_code=HTTPStatus.OK.real, detail=HTTPStatus.OK.phrase )