Skip to content

Commit

Permalink
SyncS3Gateway (#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
caspervdw authored May 28, 2024
1 parent e198bf6 commit 08394a6
Show file tree
Hide file tree
Showing 13 changed files with 699 additions and 112 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
- name: Install python dependencies
run: |
pip install --disable-pip-version-check --upgrade pip setuptools
pip install -e .[dramatiq,fastapi,auth,celery,fluentbit,sql,sql_sync,s3,api_client,amqp,nanoid,test] ${{ matrix.pins }}
pip install -e .[dramatiq,fastapi,auth,celery,fluentbit,sql,sql_sync,s3,s3_sync,api_client,amqp,nanoid,test] ${{ matrix.pins }}
pip list
- name: Run tests
Expand Down
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## 0.14.1 (unreleased)
----------------------

- Nothing changed yet.
- Added synchronous S3 interface (installable through optional `s3_sync` dependency).


## 0.14.0 (2024-05-22)
Expand Down
3 changes: 3 additions & 0 deletions clean_python/s3/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from .key_mapper import * # NOQA
from .s3_bucket_options import * # NOQA
from .s3_gateway import * # NOQA
from .s3_provider import * # NOQA
from .sync_s3_gateway import * # NOQA
from .sync_s3_provider import * # NOQA
11 changes: 11 additions & 0 deletions clean_python/s3/s3_bucket_options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from clean_python import ValueObject

__all__ = ["S3BucketOptions"]


class S3BucketOptions(ValueObject):
url: str
access_key: str
secret_key: str
bucket: str
region: str | None = None
17 changes: 7 additions & 10 deletions clean_python/s3/s3_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
import logging
from typing import TYPE_CHECKING

import aioboto3
try:
import aioboto3
except ImportError:
aioboto3 = None

from botocore.client import Config

from clean_python import ValueObject
from .s3_bucket_options import S3BucketOptions

if TYPE_CHECKING:
from types_aiobotocore_s3.client import S3Client
Expand All @@ -16,14 +20,6 @@
logger = logging.getLogger(__name__)


class S3BucketOptions(ValueObject):
url: str
access_key: str
secret_key: str
bucket: str
region: str | None = None


class S3BucketProvider:
def __init__(self, options: S3BucketOptions):
self.options = options
Expand All @@ -34,6 +30,7 @@ def bucket(self) -> str:

@property
def client(self) -> "S3Client":
assert aioboto3 is not None
session = aioboto3.Session()
return session.client(
"s3",
Expand Down
212 changes: 212 additions & 0 deletions clean_python/s3/sync_s3_gateway.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
# This module is a copy paste of s3_gateway.py

import logging
from pathlib import Path

import inject
from botocore.exceptions import ClientError
from pydantic import AnyHttpUrl

from clean_python import ctx
from clean_python import DoesNotExist
from clean_python import Filter
from clean_python import Id
from clean_python import Json
from clean_python import PageOptions
from clean_python import SyncGateway

from .sync_s3_provider import SyncS3BucketProvider

DEFAULT_EXPIRY = 3600 # in seconds
DEFAULT_TIMEOUT = 1.0
AWS_LIMIT = 1000 # max s3 keys per request


__all__ = ["SyncS3Gateway"]

logger = logging.getLogger(__name__)


class SyncS3Gateway(SyncGateway):
"""The interface to S3 Buckets.
The standard Gateway interface is only partially implemented:
- get() and filter() return metadata
- add(), update(), upsert() are not implemented
- remove() works as expected
For actually getting the object data either use the download_file()
or upload_file() or create a presigned url and hand that over to
the client.
"""

def __init__(
self,
provider_override: SyncS3BucketProvider | None = None,
multitenant: bool = False,
):
self.provider_override = provider_override
self.multitenant = multitenant

@property
def provider(self):
return self.provider_override or inject.instance(SyncS3BucketProvider)

def _id_to_key(self, id: Id) -> str:
if not self.multitenant:
return str(id)
if ctx.tenant is None:
raise RuntimeError(f"{self.__class__} requires a tenant in the context")
return f"tenant-{ctx.tenant.id}/{id}"

def _key_to_id(self, key: str) -> Id:
return key.split("/", 1)[1] if self.multitenant else key

def get(self, id: Id) -> Json | None:
try:
result = self.provider.client.head_object(
Bucket=self.provider.bucket, Key=self._id_to_key(id)
)
except ClientError as e:
if e.response["Error"]["Code"] == "404":
return None
else:
raise e
return {
"id": str(id),
"last_modified": result["LastModified"],
"etag": result["ETag"].strip('"'),
"size": result["ContentLength"],
}

def filter(
self,
filters: list[Filter],
params: PageOptions | None = PageOptions(limit=AWS_LIMIT),
) -> list[Json]:
assert params is not None, "pagination is required for S3Gateway"
assert params.limit <= AWS_LIMIT, f"max {AWS_LIMIT} keys for S3Gateway"
assert params.offset == 0, "no 'offset' pagination for S3Gateway"
assert params.order_by == "id", "can order by 'id' only for S3Gateway"
kwargs = {
"Bucket": self.provider.bucket,
"MaxKeys": params.limit,
"Prefix": self.filters_to_prefix(filters),
}
if params.cursor is not None:
kwargs["StartAfter"] = self._id_to_key(params.cursor)
result = self.provider.client.list_objects_v2(**kwargs)
# Example response:
# {
# 'Key': 'object-in-s3',
# 'LastModified': datetime.datetime(..., tzinfo=utc),
# 'ETag': '"acbd18db4cc2f85cedef654fccc4a4d8"',
# 'Size': 3, 'StorageClass':
# 'STANDARD',
# 'Owner': {...}
# }
return [
{
"id": self._key_to_id(x["Key"]),
"last_modified": x["LastModified"],
"etag": x["ETag"].strip('"'),
"size": x["Size"],
}
for x in result.get("Contents", [])
]

def remove(self, id: Id) -> bool:
self.provider.client.delete_object(
Bucket=self.provider.bucket,
Key=self._id_to_key(id),
)
# S3 doesn't tell us if the object was there in the first place
return True

def remove_multiple(self, ids: list[Id]) -> None:
if len(ids) == 0:
return
assert len(ids) <= AWS_LIMIT, f"max {AWS_LIMIT} keys for S3Gateway"
self.provider.client.delete_objects(
Bucket=self.provider.bucket,
Delete={
"Objects": [{"Key": self._id_to_key(x)} for x in ids],
"Quiet": True,
},
)

def _create_presigned_url(
self,
id: Id,
client_method: str,
) -> AnyHttpUrl:
return self.provider.client.generate_presigned_url(
client_method,
Params={"Bucket": self.provider.bucket, "Key": self._id_to_key(id)},
ExpiresIn=DEFAULT_EXPIRY,
)

def create_download_url(self, id: Id) -> AnyHttpUrl:
return self._create_presigned_url(id, "get_object")

def create_upload_url(self, id: Id) -> AnyHttpUrl:
return self._create_presigned_url(id, "put_object")

def download_file(self, id: Id, file_path: Path) -> None:
if file_path.exists():
raise FileExistsError()
try:
self.provider.client.download_file(
Bucket=self.provider.bucket,
Key=self._id_to_key(id),
Filename=str(file_path),
)
except ClientError as e:
if e.response["Error"]["Code"] == "404":
file_path.unlink(missing_ok=True)
raise DoesNotExist("object")
else:
raise e

def upload_file(self, id: Id, file_path: Path) -> None:
if not file_path.is_file():
raise FileNotFoundError()
self.provider.client.upload_file(
Bucket=self.provider.bucket,
Key=self._id_to_key(id),
Filename=str(file_path),
)

def filters_to_prefix(self, filters: list[Filter]) -> str:
if len(filters) == 0:
return self._id_to_key("")
elif len(filters) > 1:
raise NotImplementedError("More than 1 filter is not supported")
(filter,) = filters
if filter.field == "prefix":
assert len(filter.values) == 1
return self._id_to_key(filter.values[0])
else:
raise NotImplementedError(f"Unsupported filter '{filter.field}'")

def remove_filtered(self, filters: list[Filter]) -> None:
kwargs = {
"Bucket": self.provider.bucket,
"MaxKeys": AWS_LIMIT,
"Prefix": self.filters_to_prefix(filters),
}
while True:
result = self.provider.client.list_objects_v2(**kwargs)
contents = result.get("Contents", [])
if contents:
self.provider.client.delete_objects(
Bucket=self.provider.bucket,
Delete={
"Objects": [{"Key": x["Key"]} for x in contents],
"Quiet": True,
},
)
if len(contents) < AWS_LIMIT:
break
kwargs["StartAfter"] = contents[-1]["Key"]
44 changes: 44 additions & 0 deletions clean_python/s3/sync_s3_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# (c) Nelen & Schuurmans

import logging
from typing import TYPE_CHECKING

import boto3
from botocore.client import Config

from .s3_bucket_options import S3BucketOptions

if TYPE_CHECKING:
from mypy_boto3_s3.client import S3Client

__all__ = ["S3BucketOptions", "SyncS3BucketProvider"]

logger = logging.getLogger(__name__)


class SyncS3BucketProvider:
def __init__(self, options: S3BucketOptions):
self.options = options

@property
def bucket(self) -> str:
return self.options.bucket

@property
def client(self) -> "S3Client":
return boto3.client(
"s3",
endpoint_url=self.options.url,
aws_access_key_id=self.options.access_key,
aws_secret_access_key=self.options.secret_key,
region_name=self.options.region,
config=Config(
s3={"addressing_style": "virtual"}, # "path" will become deprecated
signature_version="s3v4", # for minio
retries={
"max_attempts": 4, # 1 try and up to 3 retries
"mode": "adaptive",
},
),
use_ssl=self.options.url.startswith("https"),
)
Loading

0 comments on commit 08394a6

Please sign in to comment.