Skip to content

Commit

Permalink
pyatls: initial commit (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
HernanGatta authored Aug 15, 2023
1 parent b56b741 commit 31b4993
Show file tree
Hide file tree
Showing 16 changed files with 633 additions and 4 deletions.
3 changes: 0 additions & 3 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ ignore =
# Causing false positive on list slices
# https://github.com/PyCQA/pycodestyle/issues/373
E203,
# E266: Too many leading '#' for block comment
# This rule is too strict for comment blocks that we currently have
E266,
# W503: Line break occurred before a binary operator
# PEP8 now recommend line breaks should occur before the binary operator
W503
Expand Down
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
.mypy_cache
.vscode
**/__pycache__
**/*.egg-info
build
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ warn_unreachable = true
# Per-module options:

[[tool.mypy.overrides]]
module = ["requests"]
module = ["requests", "OpenSSL", "OpenSSL.crypto", "OpenSSL.SSL"]
# Ignore "missing library stubs or py.typed marker" for all the above modules
ignore_missing_imports = true
1 change: 1 addition & 0 deletions python-package/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# PyATLS
16 changes: 16 additions & 0 deletions python-package/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[project]
name = "pyatls"
version = "0.0.1"
description = "A Python package that implements Attested TLS (aTLS)."
readme = "README.md"
requires-python = ">=3.8"
dependencies = [
"cryptography==41.0.3",
"PyJWT==2.8.0",
"pyOpenSSL==23.2.0",
"urllib3==1.26.16",
]

[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
4 changes: 4 additions & 0 deletions python-package/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
cryptography==41.0.3
PyJWT==2.8.0
pyOpenSSL==23.2.0
urllib3==1.26.16
21 changes: 21 additions & 0 deletions python-package/sample/aci_debug_policy.rego
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package policy

api_svn := "0.10.0"

mount_device := {"allowed": true}
mount_overlay := {"allowed": true}
create_container := {"allowed": true, "env_list": null, "allow_stdio_access": true}
unmount_device := {"allowed": true}
unmount_overlay := {"allowed": true}
exec_in_container := {"allowed": true, "env_list": null}
exec_external := {"allowed": true, "env_list": null, "allow_stdio_access": true}
shutdown_container := {"allowed": true}
signal_container_process := {"allowed": true}
plan9_mount := {"allowed": true}
plan9_unmount := {"allowed": true}
get_properties := {"allowed": true}
dump_stacks := {"allowed": true}
runtime_logging := {"allowed": true}
load_fragment := {"allowed": true}
scratch_mount := {"allowed": true}
scratch_unmount := {"allowed": true}
97 changes: 97 additions & 0 deletions python-package/sample/connect_aci.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Sample usage of the PyATLS package
#
# Suppose a simple HTTP server with a single GET endpoint /index is running
# over aTLS in an AMD SEV-SNP-backed Azure ACI container instance on HOST:PORT.
# Suppose further that this service is issuing attestation documents via the
# Azure Attestation Service (AAS), particularly using the publicly available
# endpoint in the East US 2 region, and is running with the debug Confidential
# Computing Enforcement (CCE) policy. Then, you can run this program as
# follows from the directory where this file is located:
#
# python3 connect_aci.py \
# --host HOST \
# --port PORT \
# --policy aci_debug_policy.rego \
# --jku https://sharedeus2.eus2.attest.azure.net/certs \
# --method GET \
# --url /index

import argparse
from typing import List, Optional

from pyatls import AttestedHTTPSConnection, AttestedTLSContext
from pyatls.validators import AzAasAciValidator

# Parse arguments
parser = argparse.ArgumentParser()

parser.add_argument(
"--server", required=True, help="IP or hostname to connect to"
)

parser.add_argument(
"--port", default=443, help="port to connect to (default: 443)"
)

parser.add_argument(
"--method",
default="GET",
help="HTTP method to use in the request " "(default: GET)",
)

parser.add_argument(
"--url",
default="/index",
help="URL to perform the HTTP request against " "(default: /index)",
)

parser.add_argument(
"--policy",
nargs="*",
help="path to a CCE policy in Rego format, may be "
"specified multiple times, once for each allowed policy "
"(default: ignore)",
)

parser.add_argument(
"--jku",
nargs="*",
help="allowed JWKS URL to verify the JKU claim in the AAS "
"JWT token against, may be specified multiple times, one "
"for each allowed value (default: ignore)",
)

args = parser.parse_args()

policy_files: Optional[List[str]] = args.policy
jkus: Optional[List[str]] = args.jku

# Read in the specified Rego policies, if any.
policies: Optional[List[str]] = None
if policy_files is not None:
policies = []
for filepath in policy_files:
with open(filepath) as f:
policies.append(f.read())

# Set up the Azure AAS ACI validator:
# - The policies array carries all allowed CCE policies, or none if the policy
# should be ignored.
#
# - The JKUs array carries all allowed JWKS URLs, or none if the JKU claim in
# the AAS JWT token sent by the server during the aTLS handshake should not
# be checked.
validator = AzAasAciValidator(policies=policies, jkus=jkus)

# Set up the aTLS context, including at least one attestation document
# validator (only one need succeed).
ctx = AttestedTLSContext([validator])

# Set up the HTTP request machinery using the aTLS context.
conn = AttestedHTTPSConnection(args.server, ctx, args.port)

# Send the HTTP request, and read and print the response in the usual way.
conn.request(args.method, args.url)
print(conn.getresponse().read().decode())

conn.close()
3 changes: 3 additions & 0 deletions python-package/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import setuptools

setuptools.setup()
4 changes: 4 additions & 0 deletions python-package/src/pyatls/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from pyatls.attested_https_connection import AttestedHTTPSConnection
from pyatls.attested_tls_context import AttestedTLSContext

__all__ = ["AttestedHTTPSConnection", "AttestedTLSContext"]
58 changes: 58 additions & 0 deletions python-package/src/pyatls/attested_https_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import socket
from http.client import HTTPS_PORT, HTTPConnection
from typing import Optional, Tuple

from pyatls import AttestedTLSContext


class AttestedHTTPSConnection(HTTPConnection):
"""
Performs HTTP requests over an Attested TLS (aTLS) connection. It is
equivalent to HTTPSConnection, but the underlying transport is aTLS instead
of standard TLS.
Parameters
----------
host : str
IP address or hostname to connect to.
context : AttestedTLSContext
An aTLS context that performs the aTLS handshake.
port : int, optional
Port to connect to.
timeout : int
Timeout for the attempt to connect to the host on the specified port.
source_address : tuple of str and int, optional
A pair of (host, port) for the client socket to bind to before
connecting to the remote host.
blocksize : int
Size in bytes of blocks when sending and receiving data to and from the
remote host, respectively.
"""

default_port = HTTPS_PORT

def __init__(
self,
host: str,
context: AttestedTLSContext,
port: Optional[int] = None,
timeout: int = socket._GLOBAL_DEFAULT_TIMEOUT, # type: ignore
source_address: Optional[Tuple[str, int]] = None,
blocksize: int = 8192,
) -> None:
super().__init__(host, port, timeout, source_address, blocksize)

if not isinstance(context, AttestedTLSContext):
raise ValueError("context must be an instance of AtlsContext")

self._context = context

def connect(self) -> None:
super().connect()

self.sock = self._context.wrap_socket(self.sock)
113 changes: 113 additions & 0 deletions python-package/src/pyatls/attested_tls_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import base64
import secrets
import socket
import ssl
import warnings
from typing import List, Optional

import OpenSSL.crypto
import OpenSSL.SSL
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from cryptography.x509.extensions import ExtensionNotFound

# TODO/HEGATTA: Either take the code from urllib3 that wraps PyOpenSSL or ditch
# PyOpenSSL altogether in favor of either modifying Python's SSL module to
# support custom certificate validation or switching to mbedTLS (and
# contributing support for custom certificate validation there).
warnings.filterwarnings("ignore", category=DeprecationWarning)
from urllib3.contrib.pyopenssl import PyOpenSSLContext # noqa: E402
from urllib3.contrib.pyopenssl import WrappedSocket # noqa: E402

from .validators import Validator # noqa: E402


class AttestedTLSContext(PyOpenSSLContext):
"""
An SSL context that supports validation of aTLS certificates.
Attention: Because this class manages the aTLS handshake's nonce, you must
use different instances for different connections.
Parameters
----------
validators : list of Validator
A list of one or more evidence or attestation result validators. During
the TLS handshake, each validator in this list is queried for the
certificate extension OID that contains the attestation document that
the validator understands and if a corresponding extension is found in
the peer's certificate, the validator is invoked.
nonce : bytes, optional
A random string of bytes to use as a nonce to ascertain the freshness
of attestation evidence and mitigate replay attacks. If None, a random
nonce is automatically generated.
"""

def __init__(
self, validators: List[Validator], nonce: Optional[bytes] = None
) -> None:
super().__init__(ssl.PROTOCOL_TLSv1_2)

if len(validators) == 0:
raise ValueError("At least one validator is necessary")

if nonce is None:
nonce = secrets.token_bytes(32)

self._validators = validators
self._nonce = nonce

self._ctx.set_verify(OpenSSL.SSL.VERIFY_PEER, self._verify_certificate)

def _verify_certificate(
self,
conn: OpenSSL.SSL.Connection,
x509: OpenSSL.crypto.X509,
err_no: int,
err_depth: int,
return_code: int,
) -> bool:
"""OpenSSL certificate validation callback"""

peer_cert = x509.to_cryptography()

for validator in self._validators:
if not isinstance(validator, Validator):
raise ValueError("A specified validator is of the wrong type")

id = validator.get_identifier()

try:
ext = peer_cert.extensions.get_extension_for_oid(id)
except ExtensionNotFound:
continue

document = ext.value.value
pub = peer_cert.public_key()
spki = pub.public_bytes(
Encoding.DER, PublicFormat.SubjectPublicKeyInfo
)

try:
return validator.validate(document, spki, self._nonce)
except Exception:
continue

return False

def wrap_socket(self, sock: socket.socket) -> WrappedSocket:
sni = base64.encodebytes(self._nonce)

return super().wrap_socket(sock, False, True, True, sni)

@property
def validators(self) -> List[Validator]:
return self._validators

@validators.setter
def validators(self, validators: List[Validator]) -> None:
self._validators = validators

@property
def nonce(self) -> bytes:
return self._nonce
10 changes: 10 additions & 0 deletions python-package/src/pyatls/validators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from pyatls.validators.az_aas_aci_validator import AzAasAciValidator
from pyatls.validators.az_aas_cvm_validator import AzAasCvmValidator
from pyatls.validators.validator import SecurityWarning, Validator

__all__ = [
"AzAasAciValidator",
"AzAasCvmValidator",
"SecurityWarning",
"Validator",
]
Loading

0 comments on commit 31b4993

Please sign in to comment.