Skip to content

Commit

Permalink
Encrypt secrets when stored in database
Browse files Browse the repository at this point in the history
Include a complexity checker for API key and secret
  • Loading branch information
dormant-user committed Sep 16, 2024
1 parent 543d92a commit 1f5c811
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 43 deletions.
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/

*.env
*.json
.vscode

temp.py
*.db
28 changes: 0 additions & 28 deletions decryptor.py

This file was deleted.

4 changes: 0 additions & 4 deletions encryptor.py

This file was deleted.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ requires-python = ">=3.10"
packages = ["vaultapi"]

[tool.setuptools.dynamic]
version = {attr = "vaultapi.version"}
version = {attr = "vaultapi.version.__version__"}
dependencies = { file = ["requirements.txt"] }

[project.optional-dependencies]
Expand Down
3 changes: 2 additions & 1 deletion vaultapi/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import pathlib

import uvicorn
from cryptography.fernet import Fernet
from fastapi import FastAPI

from . import models, routers, squire, version
Expand Down Expand Up @@ -29,7 +30,7 @@ def start(**kwargs) -> None:
log_config: Logging configuration as a dict or a FilePath. Supports .yaml/.yml, .json or .ini formats.
"""
models.env = squire.load_env(**kwargs)
# models.session.fernet = Fernet(models.env.secret)
models.session.fernet = Fernet(models.env.secret)
models.database = models.Database(models.env.database)
models.database.create_table("default", ["key", "value"])
module_name = pathlib.Path(__file__)
Expand Down
93 changes: 90 additions & 3 deletions vaultapi/models.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,62 @@
import pathlib
import re
import socket
import sqlite3
from typing import Any, Dict, List, Set, Tuple

from cryptography.fernet import Fernet
from pydantic import BaseModel, Field, FilePath, PositiveInt
from pydantic import BaseModel, Field, FilePath, NewPath, PositiveInt, field_validator
from pydantic_settings import BaseSettings


def complexity_checker(secret: str, simple: bool = False) -> None:
"""Verifies the strength of a secret.
Args:
secret: Value of the secret.
simple: Boolean flag to increase complexity.
See Also:
A secret is considered strong if it at least has:
- 32 characters
- 1 digit
- 1 symbol
- 1 uppercase letter
- 1 lowercase letter
Raises:
AssertionError: When at least 1 of the above conditions fail to match.
"""
char_limit = 8 if simple else 32

# calculates the length
assert (
len(secret) >= char_limit
), f"Minimum secret length is {char_limit}, received {len(secret)}"

# searches for digits
assert re.search(r"\d", secret), "secret must include an integer"

if simple:
return

# searches for uppercase
assert re.search(
r"[A-Z]", secret
), "secret must include at least one uppercase letter"

# searches for lowercase
assert re.search(
r"[a-z]", secret
), "secret must include at least one lowercase letter"

# searches for symbols
assert re.search(
r"[ !#$%&'()*+,-./[\\\]^_`{|}~" + r'"]', secret
), "secret must contain at least one special character"


class Database:
"""Creates a connection and instantiates the cursor.
Expand Down Expand Up @@ -81,15 +130,53 @@ class EnvConfig(BaseSettings):
"""

apikey: str
# secret: str
database: str = Field("secrets.db", pattern=".*.db$")
secret: str
database: FilePath | NewPath | str = Field("secrets.db", pattern=".*.db$")
host: str = socket.gethostbyname("localhost") or "0.0.0.0"
port: PositiveInt = 8080
workers: PositiveInt = 1
log_config: FilePath | Dict[str, Any] | None = None
allowed_origins: List[str] = []
rate_limit: RateLimit | List[RateLimit] = []

# noinspection PyMethodParameters
@field_validator("apikey", mode="after")
def parse_apikey(cls, value: str | None) -> str | None:
"""Parse API key to validate complexity.
Args:
value: Takes the user input as an argument.
Returns:
str:
Returns the parsed value.
"""
if value:
try:
complexity_checker(value, True)
except AssertionError as error:
raise ValueError(error.__str__())
return value

# noinspection PyMethodParameters
@field_validator("secret", mode="after")
def parse_api_secret(cls, value: str | None) -> str | None:
"""Parse API secret to validate complexity.
Args:
value: Takes the user input as an argument.
Returns:
str:
Returns the parsed value.
"""
if value:
try:
complexity_checker(value)
except AssertionError as error:
raise ValueError(error.__str__())
return value

@classmethod
def from_env_file(cls, env_file: pathlib.Path) -> "EnvConfig":
"""Create Settings instance from environment file.
Expand Down
2 changes: 1 addition & 1 deletion vaultapi/payload.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from pydantic import BaseModel, FilePath, DirectoryPath
from pydantic import BaseModel


class DeleteSecret(BaseModel):
Expand Down
12 changes: 9 additions & 3 deletions vaultapi/routers.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ async def get_secret(
await auth.validate(request, apikey)
if value := await retrieve_existing(key, table_name):
LOGGER.info("Secret value for '%s' was retrieved", key)
raise exceptions.APIResponse(status_code=HTTPStatus.OK.real, detail=value)
decrypted = models.session.fernet.decrypt(value).decode(encoding="UTF-8")
raise exceptions.APIResponse(status_code=HTTPStatus.OK.real, detail=decrypted)
LOGGER.info("Secret value for '%s' NOT found in the datastore", key)
raise exceptions.APIResponse(
status_code=HTTPStatus.NOT_FOUND.real, detail=HTTPStatus.NOT_FOUND.phrase
Expand Down Expand Up @@ -86,8 +87,13 @@ async def put_secret(
if await retrieve_existing(data.key, data.table_name):
LOGGER.info("Secret value for '%s' will be overridden", data.key)
else:
LOGGER.info("Storing a secret value for '%s' in the datastore", data.key)
database.put_secret(key=data.key, value=data.value, table_name=data.table_name)
LOGGER.info(
"Storing a secret value for '%s' to the table '%s' in the datastore",
data.key,
data.table_name,
)
encrypted = models.session.fernet.encrypt(data.value.encode(encoding="UTF-8"))
database.put_secret(key=data.key, value=encrypted, table_name=data.table_name)
raise exceptions.APIResponse(
status_code=HTTPStatus.OK.real, detail=HTTPStatus.OK.phrase
)
Expand Down

0 comments on commit 1f5c811

Please sign in to comment.